github-actions[bot] commented on code in PR #61329:
URL: https://github.com/apache/doris/pull/61329#discussion_r2935244840


##########
be/src/io/fs/oss_file_writer.cpp:
##########
@@ -0,0 +1,503 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/oss_file_writer.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <bvar/reducer.h>
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <sstream>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/path.h"
+#include "io/fs/s3_file_bufferpool.h"
+#include "runtime/exec_env.h"
+#include "util/debug_points.h"
+
+namespace doris::io {
+
+// Existing metrics
+bvar::Adder<uint64_t> oss_file_writer_total("oss_file_writer_total_num");
+bvar::Adder<uint64_t> oss_bytes_written_total("oss_file_writer_bytes_written");
+bvar::Adder<uint64_t> oss_file_created_total("oss_file_writer_file_created");
+bvar::Adder<uint64_t> 
oss_file_being_written("oss_file_writer_file_being_written");
+
+// New async metrics
+bvar::Adder<int64_t> 
oss_file_writer_async_close_queuing("oss_file_writer_async_close_queuing");
+bvar::Adder<int64_t> oss_file_writer_async_close_processing(
+        "oss_file_writer_async_close_processing");
+
+OSSFileWriter::OSSFileWriter(std::shared_ptr<OSSClientHolder> client, 
std::string bucket,
+                             std::string key, const FileWriterOptions* opts)
+        : _path(fmt::format("oss://{}/{}", bucket, key)),
+          _bucket(std::move(bucket)),
+          _key(std::move(key)),
+          _client(std::move(client)),
+          _buffer_size(config::s3_write_buffer_size),
+          _used_by_oss_committer(opts ? opts->used_by_s3_committer : false) {
+    oss_file_writer_total << 1;
+    oss_file_being_written << 1;
+
+    _completed_parts.reserve(100);
+
+    init_cache_builder(opts, _path);
+}
+
+OSSFileWriter::~OSSFileWriter() {
+    // Wait for any pending async operations to complete
+    _wait_until_finish("~OSSFileWriter");
+
+    // Abort multipart upload if not completed
+    if (state() != State::CLOSED && !_upload_id.empty()) {
+        LOG(WARNING) << "OSSFileWriter destroyed without close(), aborting 
multipart upload: "
+                     << _path.native() << " upload_id: " << _upload_id;
+        static_cast<void>(_abort_multipart_upload());
+    }
+
+    if (state() == State::OPENED && !_failed) {
+        oss_bytes_written_total << _bytes_appended;
+    }
+    oss_file_being_written << -1;
+}
+
+Status OSSFileWriter::appendv(const Slice* data, size_t data_cnt) {
+    if (state() != State::OPENED) {
+        return Status::InternalError("append to closed file: {}", 
_path.native());
+    }
+
+    for (size_t i = 0; i < data_cnt; i++) {
+        size_t data_size = data[i].size;
+        const char* data_ptr = data[i].data;
+        size_t pos = 0;
+
+        while (pos < data_size) {
+            if (_failed) {
+                return _st;
+            }
+
+            // Create new buffer if needed
+            if (!_pending_buf) {
+                RETURN_IF_ERROR(_build_upload_buffer());
+            }
+
+            size_t remaining = data_size - pos;
+            size_t buffer_remaining = _buffer_size - _pending_buf->get_size();
+            size_t to_append = std::min(remaining, buffer_remaining);
+
+            Slice s(data_ptr + pos, to_append);
+            RETURN_IF_ERROR(_pending_buf->append_data(s));
+
+            pos += to_append;
+            _bytes_appended += to_append;
+
+            if (_pending_buf->get_size() == _buffer_size) {
+                // Create multipart upload on first buffer flush
+                if (_cur_part_num == 1) {
+                    RETURN_IF_ERROR(_create_multipart_upload());
+                }
+
+                _cur_part_num++;
+                _countdown_event.add_count();
+                RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
+                _pending_buf = nullptr;
+            }
+        }
+    }
+
+    return Status::OK();
+}
+
+Status OSSFileWriter::_build_upload_buffer() {
+    auto builder = FileBufferBuilder();
+    builder.set_type(BufferType::UPLOAD)
+            .set_upload_callback([part_num = _cur_part_num, 
this](UploadFileBuffer& buf) {
+                _upload_one_part(part_num, buf);
+            })
+            .set_file_offset(_bytes_appended)
+            .set_sync_after_complete_task([this](auto&& s) {
+                return 
_complete_part_task_callback(std::forward<decltype(s)>(s));
+            })
+            .set_is_cancelled([this]() { return _failed.load(); });
+
+    if (cache_builder() != nullptr) {
+        int64_t tablet_id = get_tablet_id(_path.native()).value_or(0);
+        builder.set_allocate_file_blocks_holder([builder = *cache_builder(),
+                                                 offset = _bytes_appended,
+                                                 tablet_id = tablet_id]() -> 
FileBlocksHolderPtr {
+            return builder.allocate_cache_holder(offset, 
config::s3_write_buffer_size, tablet_id);
+        });
+    }
+
+    RETURN_IF_ERROR(builder.build(&_pending_buf));
+    return Status::OK();
+}
+
+void OSSFileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
+    if (buf.is_cancelled()) {
+        LOG(INFO) << "OSS file " << _path.native() << " skip part " << part_num
+                  << " because previous failure";
+        return;
+    }
+
+    // Debug point: Simulate upload failure
+    DBUG_EXECUTE_IF("OSSFileWriter::_upload_one_part.upload_error", {
+        auto fail_part = dp->param<int64_t>("fail_part_num", 0);
+        if (fail_part == 0 || fail_part == part_num) {
+            LOG(WARNING) << "Debug point: Simulating OSS upload failure for 
part " << part_num;
+            buf.set_status(Status::IOError("Debug OSS upload error for part 
{}", part_num));
+            return;
+        }
+    });
+
+    // Debug point: Simulate slow upload
+    DBUG_EXECUTE_IF("OSSFileWriter::_upload_one_part.slow_upload", {
+        auto sleep_ms = dp->param<int>("sleep_ms", 1000);
+        LOG(INFO) << "Debug point: Simulating slow OSS upload, sleeping " << 
sleep_ms << "ms";
+        std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+    });
+
+    auto client = _client->get();
+    if (nullptr == client) {
+        buf.set_status(Status::InternalError("OSS client not initialized"));
+        return;
+    }
+
+    auto stream = buf.get_stream();
+    if (!stream) {
+        buf.set_status(Status::InternalError("Failed to get stream from upload 
buffer for part {}",
+                                             part_num));
+        return;
+    }
+
+    AlibabaCloud::OSS::UploadPartRequest request(_bucket, _key, stream);
+    request.setPartNumber(static_cast<int32_t>(part_num));
+    request.setUploadId(_upload_id);
+    request.setContentLength(buf.get_size());
+
+    auto outcome = client->UploadPart(request);
+    if (!outcome.isSuccess()) {
+        std::string err = fmt::format("OSS UploadPart {} failed: {} - {}", 
part_num,
+                                      outcome.error().Code(), 
outcome.error().Message());
+        LOG(WARNING) << err << ", path: " << _path.native();
+        buf.set_status(Status::IOError(err));
+        return;
+    }
+
+    oss_bytes_written_total << buf.get_size();
+
+    AlibabaCloud::OSS::Part part(static_cast<int32_t>(part_num), 
outcome.result().ETag());
+
+    {
+        std::lock_guard<std::mutex> lock(_completed_lock);
+        _completed_parts.push_back(part);
+    }
+
+    VLOG_DEBUG << "OSS UploadPart " << part_num << " completed: " << 
_path.native()
+               << " size: " << buf.get_size();
+}
+
+bool OSSFileWriter::_complete_part_task_callback(Status s) {
+    if (!s.ok()) {
+        _failed = true;
+        _st = std::move(s);
+        LOG(WARNING) << "OSS async upload failed: " << _path.native() << " 
error: " << _st;

Review Comment:
   **HIGH — Data race: `_st` written without holding `_completed_lock`**
   
   The S3 equivalent (`S3FileWriter::_complete_part_task_callback`) holds 
`_completed_lock` when writing both `_failed` and `_st`:
   ```cpp
   // S3 (correct):
   std::unique_lock<std::mutex> _lck {_completed_lock};
   _failed = true;
   _st = std::move(s);
   ```
   
   Here, multiple concurrent upload-part callbacks can race on writing `_st` (a 
non-atomic `Status` object), which is undefined behavior. Please add the same 
lock discipline as S3.



##########
cloud/src/recycler/oss_accessor.cpp:
##########
@@ -0,0 +1,640 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/oss_accessor.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <alibabacloud/oss/client/ClientConfiguration.h>
+#include <bvar/reducer.h>
+#include <gen_cpp/cloud.pb.h>
+#include <time.h>
+
+#include <iomanip>
+#include <memory>
+#include <sstream>
+#include <stdexcept>
+#include <utility>
+
+#include "common/config.h"
+#include "common/encryption_util.h"
+#include "common/logging.h"
+#include "common/stopwatch.h"
+#include "common/string_util.h"
+#include "common/util.h"
+#include "cpp/aws_common.h"
+#include "cpp/oss_credential_provider.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+namespace oss_bvar {
+bvar::LatencyRecorder oss_get_latency("oss_get");
+bvar::LatencyRecorder oss_put_latency("oss_put");
+bvar::LatencyRecorder oss_delete_object_latency("oss_delete_object");
+bvar::LatencyRecorder oss_delete_objects_latency("oss_delete_objects");
+bvar::LatencyRecorder oss_head_latency("oss_head");
+bvar::LatencyRecorder oss_list_latency("oss_list");
+} // namespace oss_bvar
+
+// Parse OSS LastModified (ISO 8601 UTC) to Unix timestamp
+static int64_t parse_oss_last_modified(const std::string& last_modified_str) {
+    std::tm tm = {};
+    std::istringstream ss(last_modified_str);
+    ss >> std::get_time(&tm, "%Y-%m-%dT%H:%M:%S");
+
+    if (ss.fail()) {
+        LOG(WARNING) << "Failed to parse OSS LastModified: " << 
last_modified_str;
+        return 0;
+    }

Review Comment:
   **HIGH — Data safety: returning 0 (epoch) on parse failure causes premature 
deletion**
   
   If `LastModified` fails to parse, returning 0 (Unix epoch, 1970-01-01) means 
the object appears maximally old. In `delete_prefix()`, objects with `mtime < 
expiration_time` are deleted. A new object whose timestamp happens to be in an 
unexpected format would be **incorrectly deleted**.
   
   Fix: Return `INT64_MAX` on parse failure (object appears "infinitely new", 
never prematurely deleted), or return a negative sentinel value and check it in 
callers:
   ```cpp
   if (ss.fail()) {
       LOG(WARNING) << "Failed to parse OSS LastModified: " << 
last_modified_str;
       return INT64_MAX; // Don't delete objects we can't parse
   }
   ```



##########
cloud/src/recycler/oss_accessor.cpp:
##########
@@ -0,0 +1,640 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/oss_accessor.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <alibabacloud/oss/client/ClientConfiguration.h>
+#include <bvar/reducer.h>
+#include <gen_cpp/cloud.pb.h>
+#include <time.h>
+
+#include <iomanip>
+#include <memory>
+#include <sstream>
+#include <stdexcept>
+#include <utility>
+
+#include "common/config.h"
+#include "common/encryption_util.h"
+#include "common/logging.h"
+#include "common/stopwatch.h"
+#include "common/string_util.h"
+#include "common/util.h"
+#include "cpp/aws_common.h"
+#include "cpp/oss_credential_provider.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+namespace oss_bvar {
+bvar::LatencyRecorder oss_get_latency("oss_get");
+bvar::LatencyRecorder oss_put_latency("oss_put");
+bvar::LatencyRecorder oss_delete_object_latency("oss_delete_object");
+bvar::LatencyRecorder oss_delete_objects_latency("oss_delete_objects");
+bvar::LatencyRecorder oss_head_latency("oss_head");
+bvar::LatencyRecorder oss_list_latency("oss_list");
+} // namespace oss_bvar
+
+// Parse OSS LastModified (ISO 8601 UTC) to Unix timestamp
+static int64_t parse_oss_last_modified(const std::string& last_modified_str) {
+    std::tm tm = {};
+    std::istringstream ss(last_modified_str);
+    ss >> std::get_time(&tm, "%Y-%m-%dT%H:%M:%S");
+
+    if (ss.fail()) {
+        LOG(WARNING) << "Failed to parse OSS LastModified: " << 
last_modified_str;
+        return 0;
+    }
+
+    return static_cast<int64_t>(timegm(&tm)); // timegm() for UTC time
+}
+
+// OSS List Iterator implementation
+class OSSListIterator final : public ListIterator {
+public:
+    OSSListIterator(std::shared_ptr<AlibabaCloud::OSS::OssClient> client, 
const std::string& bucket,
+                    const std::string& prefix, size_t prefix_length)
+            : client_(std::move(client)),
+              bucket_(bucket),
+              prefix_(prefix),
+              prefix_length_(prefix_length),
+              is_truncated_(true),
+              current_index_(0) {
+        // Fetch first batch
+        fetch_next_batch();
+    }
+
+    ~OSSListIterator() override = default;
+
+    bool is_valid() override { return valid_; }
+
+    bool has_next() override {
+        if (current_index_ < objects_.size()) {
+            return true;
+        }
+
+        // Need to fetch next batch
+        if (is_truncated_) {
+            fetch_next_batch();
+            return current_index_ < objects_.size();
+        }
+
+        return false;
+    }
+
+    std::optional<FileMeta> next() override {
+        if (!has_next()) {
+            return std::nullopt;
+        }
+
+        const auto& obj = objects_[current_index_++];
+        return FileMeta {.path = get_relative_path(obj.Key()),
+                         .size = obj.Size(),
+                         .mtime_s = 
parse_oss_last_modified(obj.LastModified())};
+    }
+
+private:
+    void fetch_next_batch() {
+        AlibabaCloud::OSS::ListObjectsRequest request(bucket_);
+        request.setPrefix(prefix_);
+        request.setMaxKeys(1000); // OSS default max keys per request
+
+        if (!next_marker_.empty()) {
+            request.setMarker(next_marker_);
+        }
+
+        auto outcome = client_->ListObjects(request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS ListObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            valid_ = false;
+            is_truncated_ = false;
+            return;
+        }
+
+        const auto& result = outcome.result();
+        objects_ = result.ObjectSummarys();
+        is_truncated_ = result.IsTruncated();
+        next_marker_ = result.NextMarker();
+        current_index_ = 0;
+        valid_ = true;
+    }
+
+    std::string get_relative_path(const std::string& key) const {
+        if (key.length() >= prefix_length_) {
+            return key.substr(prefix_length_);
+        }
+        return key;
+    }
+
+    std::shared_ptr<AlibabaCloud::OSS::OssClient> client_;
+    std::string bucket_;
+    std::string prefix_;
+    size_t prefix_length_;
+    bool valid_ {false};
+    bool is_truncated_;
+    std::string next_marker_;
+    AlibabaCloud::OSS::ObjectSummaryList objects_;
+    size_t current_index_;
+};
+
+// OSSConf implementation
+
+std::optional<OSSConf> OSSConf::from_obj_store_info(const ObjectStoreInfoPB& 
obj_info,
+                                                    bool skip_aksk) {
+    // Only process OSS provider
+    if (obj_info.provider() != ObjectStoreInfoPB_Provider_OSS) {
+        return std::nullopt;
+    }
+
+    OSSConf conf;
+    conf.endpoint = normalize_oss_endpoint(obj_info.endpoint());
+    conf.bucket = obj_info.bucket();
+    conf.prefix = obj_info.prefix();
+    conf.region = obj_info.region();
+
+    if (!skip_aksk) {
+        if (obj_info.has_cred_provider_type()) {
+            switch (obj_info.cred_provider_type()) {
+            case CredProviderTypePB::INSTANCE_PROFILE:
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+                break;
+            case CredProviderTypePB::DEFAULT:
+                conf.provider_type = OSSCredProviderType::DEFAULT;
+                break;
+            case CredProviderTypePB::SIMPLE:
+                conf.provider_type = OSSCredProviderType::SIMPLE;
+                if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+                    if (obj_info.has_encryption_info()) {
+                        AkSkPair plain;
+                        int ret = decrypt_ak_sk_helper(obj_info.ak(), 
obj_info.sk(),
+                                                       
obj_info.encryption_info(), &plain);
+                        if (ret != 0) {
+                            LOG(WARNING) << "Failed to decrypt OSS ak/sk";
+                            return std::nullopt;
+                        }
+                        conf.access_key_id = std::move(plain.first);
+                        conf.access_key_secret = std::move(plain.second);
+                    } else {
+                        conf.access_key_id = obj_info.ak();
+                        conf.access_key_secret = obj_info.sk();
+                    }
+                }
+                break;
+            default:
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+                break;
+            }
+        } else {
+            if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+                conf.provider_type = OSSCredProviderType::SIMPLE;
+                if (obj_info.has_encryption_info()) {
+                    AkSkPair plain;
+                    int ret = decrypt_ak_sk_helper(obj_info.ak(), 
obj_info.sk(),
+                                                   obj_info.encryption_info(), 
&plain);
+                    if (ret != 0) {
+                        LOG(WARNING) << "Failed to decrypt OSS ak/sk";
+                        return std::nullopt;
+                    }
+                    conf.access_key_id = std::move(plain.first);
+                    conf.access_key_secret = std::move(plain.second);
+                } else {
+                    conf.access_key_id = obj_info.ak();
+                    conf.access_key_secret = obj_info.sk();
+                }
+            } else {
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+            }
+        }
+
+        if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) {
+            conf.role_arn = obj_info.role_arn();
+            conf.external_id = obj_info.external_id();
+        }
+    }
+
+    return conf;
+}
+
+uint64_t OSSConf::get_hash() const {
+    std::string hash_str = endpoint + bucket + prefix + region + access_key_id 
+ access_key_secret +
+                           role_arn + external_id + 
std::to_string(static_cast<int>(provider_type));
+    return std::hash<std::string> {}(hash_str);
+}
+
+// OSSAccessor implementation
+
+OSSAccessor::OSSAccessor(OSSConf conf)
+        : StorageVaultAccessor(AccessorType::OSS), conf_(std::move(conf)) {
+    uri_ = fmt::format("oss://{}/{}", conf_.bucket, conf_.prefix.empty() ? "" 
: conf_.prefix + "/");
+}
+
+OSSAccessor::~OSSAccessor() = default;
+
+int OSSAccessor::create(OSSConf conf, std::shared_ptr<OSSAccessor>* accessor) {
+    *accessor = std::make_shared<OSSAccessor>(std::move(conf));
+    return (*accessor)->init();
+}
+
+int OSSAccessor::init() {
+    static std::once_flag init_flag;
+    std::call_once(init_flag, []() {
+        AlibabaCloud::OSS::InitializeSdk();
+        LOG(INFO) << "Alibaba Cloud OSS SDK initialized";
+    });
+    _ca_cert_file_path =
+            
get_valid_ca_cert_path(doris::cloud::split(config::ca_cert_file_paths, ';'));
+    return create_oss_client();
+}
+
+int OSSAccessor::create_oss_client() {
+    std::lock_guard<std::mutex> lock(client_mutex_);
+
+    AlibabaCloud::OSS::ClientConfiguration oss_config;
+    oss_config.maxConnections = conf_.max_connections;
+    oss_config.connectTimeoutMs = conf_.connect_timeout_ms;
+    oss_config.requestTimeoutMs = conf_.request_timeout_ms;
+
+    if (_ca_cert_file_path.empty()) {
+        _ca_cert_file_path =
+                
get_valid_ca_cert_path(doris::cloud::split(config::ca_cert_file_paths, ';'));
+    }
+    if (!_ca_cert_file_path.empty()) {
+        oss_config.caFile = _ca_cert_file_path;
+    }
+
+    try {
+        if (conf_.provider_type == OSSCredProviderType::INSTANCE_PROFILE) {
+            if (!conf_.role_arn.empty()) {
+                if (!sts_credential_provider_) {
+                    std::string region = conf_.region.empty() ? "cn-hangzhou" 
: conf_.region;
+                    sts_credential_provider_ = 
std::make_shared<OSSSTSCredentialProvider>(
+                            conf_.role_arn, region, conf_.external_id);
+                }
+                oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                        conf_.endpoint,
+                        
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                                sts_credential_provider_),
+                        oss_config);
+                LOG(INFO) << "OSS client created with AssumeRole, endpoint=" 
<< conf_.endpoint
+                          << ", role_arn=" << conf_.role_arn;
+            } else {
+                if (!credentials_provider_) {
+                    credentials_provider_ = 
std::make_shared<ECSMetadataCredentialsProvider>();
+                }
+                oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                        conf_.endpoint,
+                        
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                                credentials_provider_),
+                        oss_config);
+                LOG(INFO) << "OSS client created with INSTANCE_PROFILE, 
endpoint="
+                          << conf_.endpoint;
+            }
+        } else if (conf_.provider_type == OSSCredProviderType::DEFAULT) {
+            if (!default_credential_provider_) {
+                default_credential_provider_ = 
std::make_shared<OSSDefaultCredentialsProvider>();
+            }
+            oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                    conf_.endpoint,
+                    
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                            default_credential_provider_),
+                    oss_config);
+            LOG(INFO) << "OSS client created with DEFAULT provider, endpoint=" 
<< conf_.endpoint;
+        } else if (conf_.provider_type == OSSCredProviderType::SIMPLE) {
+            AlibabaCloud::OSS::Credentials creds(conf_.access_key_id, 
conf_.access_key_secret,
+                                                 conf_.security_token);
+            oss_client_ = 
std::make_shared<AlibabaCloud::OSS::OssClient>(conf_.endpoint, creds,
+                                                                         
oss_config);
+            LOG(INFO) << "OSS client created with SIMPLE credentials, 
endpoint=" << conf_.endpoint;
+        } else {
+            LOG(ERROR) << "Unsupported OSS credential provider type";
+            return -1;
+        }
+    } catch (const std::exception& e) {
+        LOG(ERROR) << "Failed to create OSS client: " << e.what();
+        return -1;
+    }
+
+    return 0;
+}
+
+int OSSAccessor::refresh_client_if_needed() {
+    // Credential providers manage their own refresh internally via 
getCredentials().
+    return 0;
+}
+
+std::string OSSAccessor::get_key(const std::string& relative_path) const {
+    std::string path = relative_path;
+    if (!path.empty() && path[0] == '/') {
+        LOG(WARNING) << "OSS relative path should not start with '/': " << 
relative_path;
+        path = path.substr(1);
+    }
+    return conf_.prefix.empty() ? path : conf_.prefix + "/" + path;
+}
+
+std::string OSSAccessor::to_uri(const std::string& relative_path) const {
+    return uri_ + relative_path;
+}
+
+int OSSAccessor::convert_oss_error_code(const std::string& error_code) const {
+    if (error_code == "NoSuchKey" || error_code == "NoSuchBucket") {
+        return 1; // Not found
+    } else if (error_code == "AccessDenied" || error_code == 
"InvalidAccessKeyId") {
+        return -2; // Access denied
+    } else if (error_code == "SecurityTokenExpired") {
+        LOG(WARNING) << "OSS security token expired, will refresh on next 
operation";
+        return -3; // Credentials expired
+    } else if (error_code == "RequestTimeout" || error_code == 
"ConnectionTimeout") {
+        return -4; // Timeout
+    }
+
+    LOG(WARNING) << "OSS operation failed with error code: " << error_code;
+    return -1; // Generic error
+}
+
+std::shared_ptr<AlibabaCloud::OSS::OssClient> OSSAccessor::get_client() const {
+    std::lock_guard<std::mutex> lock(client_mutex_);
+    return oss_client_;
+}
+
+int OSSAccessor::put_file(const std::string& path, const std::string& content) 
{
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_put_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    // Create stream from content
+    std::shared_ptr<std::stringstream> content_stream =
+            std::make_shared<std::stringstream>(content);
+
+    AlibabaCloud::OSS::PutObjectRequest request(conf_.bucket, key, 
content_stream);
+
+    auto outcome = client->PutObject(request);
+    if (!outcome.isSuccess()) {
+        LOG(WARNING) << "OSS PutObject failed: " << outcome.error().Code() << 
" - "
+                     << outcome.error().Message() << ", key: " << key;
+        return convert_oss_error_code(outcome.error().Code());
+    }
+
+    VLOG(1) << "OSS PutObject success: " << key << ", size: " << 
content.size();
+    return 0;
+}
+
+int OSSAccessor::delete_file(const std::string& path) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_delete_object_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    auto outcome = client->DeleteObject(conf_.bucket, key);
+    if (!outcome.isSuccess()) {
+        // OSS DeleteObject returns success even if object doesn't exist
+        // Only log real errors
+        std::string error_code = outcome.error().Code();
+        if (error_code != "NoSuchKey") {
+            LOG(WARNING) << "OSS DeleteObject failed: " << error_code << " - "
+                         << outcome.error().Message() << ", key: " << key;
+            return convert_oss_error_code(error_code);
+        }
+    }
+
+    VLOG(1) << "OSS DeleteObject success: " << key;
+    return 0;
+}
+
+int OSSAccessor::delete_files(const std::vector<std::string>& paths) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_delete_objects_latency);
+
+    if (paths.empty()) {
+        return 0;
+    }
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    // OSS DeleteObjects supports batch delete (max 1000 keys per request)
+    const size_t batch_size = 1000;
+
+    for (size_t i = 0; i < paths.size(); i += batch_size) {
+        size_t end = std::min(i + batch_size, paths.size());
+
+        AlibabaCloud::OSS::DeletedKeyList keys;
+        for (size_t j = i; j < end; ++j) {
+            keys.push_back(get_key(paths[j]));
+        }
+
+        AlibabaCloud::OSS::DeleteObjectsRequest request(conf_.bucket);
+        request.setKeyList(keys);
+
+        auto outcome = client->DeleteObjects(request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS DeleteObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            return convert_oss_error_code(outcome.error().Code());
+        }
+
+        VLOG(1) << "OSS DeleteObjects success: deleted " << 
outcome.result().keyList().size()
+                << " objects (" << (end - i) << " requested)";
+    }
+
+    return 0;
+}
+
+int OSSAccessor::delete_prefix(const std::string& path_prefix, int64_t 
expiration_time) {
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string prefix = get_key(path_prefix);
+
+    // List all objects with prefix and delete them in batches
+    std::vector<std::string> keys_to_delete;
+    const size_t batch_size = 1000;
+
+    bool is_truncated = true;
+    std::string marker;
+
+    while (is_truncated) {
+        AlibabaCloud::OSS::ListObjectsRequest list_request(conf_.bucket);
+        list_request.setPrefix(prefix);
+        list_request.setMaxKeys(1000);
+
+        if (!marker.empty()) {
+            list_request.setMarker(marker);
+        }
+
+        auto outcome = client->ListObjects(list_request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS ListObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            return convert_oss_error_code(outcome.error().Code());
+        }
+
+        const auto& result = outcome.result();
+        const auto& objects = result.ObjectSummarys();
+
+        for (const auto& obj : objects) {
+            // Check expiration time if specified
+            if (expiration_time > 0) {
+                int64_t obj_mtime = 
parse_oss_last_modified(obj.LastModified());
+                if (obj_mtime >= expiration_time) {
+                    continue; // Skip objects newer than expiration time
+                }
+            }
+
+            keys_to_delete.push_back(obj.Key());
+
+            // Delete in batches
+            if (keys_to_delete.size() >= batch_size) {
+                AlibabaCloud::OSS::DeletedKeyList 
batch_keys(keys_to_delete.begin(),
+                                                             
keys_to_delete.end());
+                AlibabaCloud::OSS::DeleteObjectsRequest 
delete_request(conf_.bucket);
+                delete_request.setKeyList(batch_keys);
+
+                auto delete_outcome = client->DeleteObjects(delete_request);
+                if (!delete_outcome.isSuccess()) {
+                    LOG(WARNING) << "OSS DeleteObjects failed: " << 
delete_outcome.error().Code()
+                                 << " - " << delete_outcome.error().Message();
+                    return 
convert_oss_error_code(delete_outcome.error().Code());
+                }
+
+                VLOG(1) << "OSS deleted batch of " << 
delete_outcome.result().keyList().size()
+                        << " objects";
+                keys_to_delete.clear();
+            }
+        }
+
+        is_truncated = result.IsTruncated();
+        marker = result.NextMarker();
+    }
+
+    // Delete remaining keys
+    if (!keys_to_delete.empty()) {
+        AlibabaCloud::OSS::DeletedKeyList batch_keys(keys_to_delete.begin(), 
keys_to_delete.end());
+        AlibabaCloud::OSS::DeleteObjectsRequest delete_request(conf_.bucket);
+        delete_request.setKeyList(batch_keys);
+
+        auto delete_outcome = client->DeleteObjects(delete_request);
+        if (!delete_outcome.isSuccess()) {
+            LOG(WARNING) << "OSS DeleteObjects failed: " << 
delete_outcome.error().Code() << " - "
+                         << delete_outcome.error().Message();
+            return convert_oss_error_code(delete_outcome.error().Code());
+        }
+
+        VLOG(1) << "OSS deleted final batch of " << 
delete_outcome.result().keyList().size()
+                << " objects";
+    }
+
+    return 0;
+}
+
+int OSSAccessor::delete_directory(const std::string& dir_path) {
+    // For OSS, directory is just a prefix, same as delete_prefix
+    return delete_prefix(dir_path, 0);
+}
+
+int OSSAccessor::delete_all(int64_t expiration_time) {
+    // Delete all objects under the prefix
+    return delete_prefix("", expiration_time);
+}
+
+int OSSAccessor::list_directory(const std::string& dir_path, 
std::unique_ptr<ListIterator>* res) {
+    return list_prefix(dir_path, res);
+}

Review Comment:
   **HIGH — Missing empty-path guard and trailing-slash normalization in 
`list_directory`**
   
   Same issue as `delete_directory`: no empty-path validation. Additionally, 
S3Accessor appends `/` to `dir_path` if not already present (ensuring 
`list_directory("data/100")` doesn't match `data/10000/...`). The test at line 
104-106 asserts `ASSERT_NE(ret, 0)` for `list_directory("")`, which will fail.



##########
cloud/src/recycler/oss_accessor.cpp:
##########
@@ -0,0 +1,640 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/oss_accessor.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <alibabacloud/oss/client/ClientConfiguration.h>
+#include <bvar/reducer.h>
+#include <gen_cpp/cloud.pb.h>
+#include <time.h>
+
+#include <iomanip>
+#include <memory>
+#include <sstream>
+#include <stdexcept>
+#include <utility>
+
+#include "common/config.h"
+#include "common/encryption_util.h"
+#include "common/logging.h"
+#include "common/stopwatch.h"
+#include "common/string_util.h"
+#include "common/util.h"
+#include "cpp/aws_common.h"
+#include "cpp/oss_credential_provider.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+namespace oss_bvar {
+bvar::LatencyRecorder oss_get_latency("oss_get");
+bvar::LatencyRecorder oss_put_latency("oss_put");
+bvar::LatencyRecorder oss_delete_object_latency("oss_delete_object");
+bvar::LatencyRecorder oss_delete_objects_latency("oss_delete_objects");
+bvar::LatencyRecorder oss_head_latency("oss_head");
+bvar::LatencyRecorder oss_list_latency("oss_list");
+} // namespace oss_bvar
+
+// Parse OSS LastModified (ISO 8601 UTC) to Unix timestamp
+static int64_t parse_oss_last_modified(const std::string& last_modified_str) {
+    std::tm tm = {};
+    std::istringstream ss(last_modified_str);
+    ss >> std::get_time(&tm, "%Y-%m-%dT%H:%M:%S");
+
+    if (ss.fail()) {
+        LOG(WARNING) << "Failed to parse OSS LastModified: " << 
last_modified_str;
+        return 0;
+    }
+
+    return static_cast<int64_t>(timegm(&tm)); // timegm() for UTC time
+}
+
+// OSS List Iterator implementation
+class OSSListIterator final : public ListIterator {
+public:
+    OSSListIterator(std::shared_ptr<AlibabaCloud::OSS::OssClient> client, 
const std::string& bucket,
+                    const std::string& prefix, size_t prefix_length)
+            : client_(std::move(client)),
+              bucket_(bucket),
+              prefix_(prefix),
+              prefix_length_(prefix_length),
+              is_truncated_(true),
+              current_index_(0) {
+        // Fetch first batch
+        fetch_next_batch();
+    }
+
+    ~OSSListIterator() override = default;
+
+    bool is_valid() override { return valid_; }
+
+    bool has_next() override {
+        if (current_index_ < objects_.size()) {
+            return true;
+        }
+
+        // Need to fetch next batch
+        if (is_truncated_) {
+            fetch_next_batch();
+            return current_index_ < objects_.size();
+        }
+
+        return false;
+    }
+
+    std::optional<FileMeta> next() override {
+        if (!has_next()) {
+            return std::nullopt;
+        }
+
+        const auto& obj = objects_[current_index_++];
+        return FileMeta {.path = get_relative_path(obj.Key()),
+                         .size = obj.Size(),
+                         .mtime_s = 
parse_oss_last_modified(obj.LastModified())};
+    }
+
+private:
+    void fetch_next_batch() {
+        AlibabaCloud::OSS::ListObjectsRequest request(bucket_);
+        request.setPrefix(prefix_);
+        request.setMaxKeys(1000); // OSS default max keys per request
+
+        if (!next_marker_.empty()) {
+            request.setMarker(next_marker_);
+        }
+
+        auto outcome = client_->ListObjects(request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS ListObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            valid_ = false;
+            is_truncated_ = false;
+            return;
+        }
+
+        const auto& result = outcome.result();
+        objects_ = result.ObjectSummarys();
+        is_truncated_ = result.IsTruncated();
+        next_marker_ = result.NextMarker();
+        current_index_ = 0;
+        valid_ = true;
+    }
+
+    std::string get_relative_path(const std::string& key) const {
+        if (key.length() >= prefix_length_) {
+            return key.substr(prefix_length_);
+        }
+        return key;
+    }
+
+    std::shared_ptr<AlibabaCloud::OSS::OssClient> client_;
+    std::string bucket_;
+    std::string prefix_;
+    size_t prefix_length_;
+    bool valid_ {false};
+    bool is_truncated_;
+    std::string next_marker_;
+    AlibabaCloud::OSS::ObjectSummaryList objects_;
+    size_t current_index_;
+};
+
+// OSSConf implementation
+
+std::optional<OSSConf> OSSConf::from_obj_store_info(const ObjectStoreInfoPB& 
obj_info,
+                                                    bool skip_aksk) {
+    // Only process OSS provider
+    if (obj_info.provider() != ObjectStoreInfoPB_Provider_OSS) {
+        return std::nullopt;
+    }
+
+    OSSConf conf;
+    conf.endpoint = normalize_oss_endpoint(obj_info.endpoint());
+    conf.bucket = obj_info.bucket();
+    conf.prefix = obj_info.prefix();
+    conf.region = obj_info.region();
+
+    if (!skip_aksk) {
+        if (obj_info.has_cred_provider_type()) {
+            switch (obj_info.cred_provider_type()) {
+            case CredProviderTypePB::INSTANCE_PROFILE:
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+                break;
+            case CredProviderTypePB::DEFAULT:
+                conf.provider_type = OSSCredProviderType::DEFAULT;
+                break;
+            case CredProviderTypePB::SIMPLE:
+                conf.provider_type = OSSCredProviderType::SIMPLE;
+                if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+                    if (obj_info.has_encryption_info()) {
+                        AkSkPair plain;
+                        int ret = decrypt_ak_sk_helper(obj_info.ak(), 
obj_info.sk(),
+                                                       
obj_info.encryption_info(), &plain);
+                        if (ret != 0) {
+                            LOG(WARNING) << "Failed to decrypt OSS ak/sk";
+                            return std::nullopt;
+                        }
+                        conf.access_key_id = std::move(plain.first);
+                        conf.access_key_secret = std::move(plain.second);
+                    } else {
+                        conf.access_key_id = obj_info.ak();
+                        conf.access_key_secret = obj_info.sk();
+                    }
+                }
+                break;
+            default:
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+                break;
+            }
+        } else {
+            if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+                conf.provider_type = OSSCredProviderType::SIMPLE;
+                if (obj_info.has_encryption_info()) {
+                    AkSkPair plain;
+                    int ret = decrypt_ak_sk_helper(obj_info.ak(), 
obj_info.sk(),
+                                                   obj_info.encryption_info(), 
&plain);
+                    if (ret != 0) {
+                        LOG(WARNING) << "Failed to decrypt OSS ak/sk";
+                        return std::nullopt;
+                    }
+                    conf.access_key_id = std::move(plain.first);
+                    conf.access_key_secret = std::move(plain.second);
+                } else {
+                    conf.access_key_id = obj_info.ak();
+                    conf.access_key_secret = obj_info.sk();
+                }
+            } else {
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+            }
+        }
+
+        if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) {
+            conf.role_arn = obj_info.role_arn();
+            conf.external_id = obj_info.external_id();
+        }
+    }
+
+    return conf;
+}
+
+uint64_t OSSConf::get_hash() const {
+    std::string hash_str = endpoint + bucket + prefix + region + access_key_id 
+ access_key_secret +
+                           role_arn + external_id + 
std::to_string(static_cast<int>(provider_type));
+    return std::hash<std::string> {}(hash_str);
+}
+
+// OSSAccessor implementation
+
+OSSAccessor::OSSAccessor(OSSConf conf)
+        : StorageVaultAccessor(AccessorType::OSS), conf_(std::move(conf)) {
+    uri_ = fmt::format("oss://{}/{}", conf_.bucket, conf_.prefix.empty() ? "" 
: conf_.prefix + "/");
+}
+
+OSSAccessor::~OSSAccessor() = default;
+
+int OSSAccessor::create(OSSConf conf, std::shared_ptr<OSSAccessor>* accessor) {
+    *accessor = std::make_shared<OSSAccessor>(std::move(conf));
+    return (*accessor)->init();
+}
+
+int OSSAccessor::init() {
+    static std::once_flag init_flag;
+    std::call_once(init_flag, []() {
+        AlibabaCloud::OSS::InitializeSdk();
+        LOG(INFO) << "Alibaba Cloud OSS SDK initialized";
+    });
+    _ca_cert_file_path =
+            
get_valid_ca_cert_path(doris::cloud::split(config::ca_cert_file_paths, ';'));
+    return create_oss_client();
+}
+
+int OSSAccessor::create_oss_client() {
+    std::lock_guard<std::mutex> lock(client_mutex_);
+
+    AlibabaCloud::OSS::ClientConfiguration oss_config;
+    oss_config.maxConnections = conf_.max_connections;
+    oss_config.connectTimeoutMs = conf_.connect_timeout_ms;
+    oss_config.requestTimeoutMs = conf_.request_timeout_ms;
+
+    if (_ca_cert_file_path.empty()) {
+        _ca_cert_file_path =
+                
get_valid_ca_cert_path(doris::cloud::split(config::ca_cert_file_paths, ';'));
+    }
+    if (!_ca_cert_file_path.empty()) {
+        oss_config.caFile = _ca_cert_file_path;
+    }
+
+    try {
+        if (conf_.provider_type == OSSCredProviderType::INSTANCE_PROFILE) {
+            if (!conf_.role_arn.empty()) {
+                if (!sts_credential_provider_) {
+                    std::string region = conf_.region.empty() ? "cn-hangzhou" 
: conf_.region;
+                    sts_credential_provider_ = 
std::make_shared<OSSSTSCredentialProvider>(
+                            conf_.role_arn, region, conf_.external_id);
+                }
+                oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                        conf_.endpoint,
+                        
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                                sts_credential_provider_),
+                        oss_config);
+                LOG(INFO) << "OSS client created with AssumeRole, endpoint=" 
<< conf_.endpoint
+                          << ", role_arn=" << conf_.role_arn;
+            } else {
+                if (!credentials_provider_) {
+                    credentials_provider_ = 
std::make_shared<ECSMetadataCredentialsProvider>();
+                }
+                oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                        conf_.endpoint,
+                        
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                                credentials_provider_),
+                        oss_config);
+                LOG(INFO) << "OSS client created with INSTANCE_PROFILE, 
endpoint="
+                          << conf_.endpoint;
+            }
+        } else if (conf_.provider_type == OSSCredProviderType::DEFAULT) {
+            if (!default_credential_provider_) {
+                default_credential_provider_ = 
std::make_shared<OSSDefaultCredentialsProvider>();
+            }
+            oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                    conf_.endpoint,
+                    
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                            default_credential_provider_),
+                    oss_config);
+            LOG(INFO) << "OSS client created with DEFAULT provider, endpoint=" 
<< conf_.endpoint;
+        } else if (conf_.provider_type == OSSCredProviderType::SIMPLE) {
+            AlibabaCloud::OSS::Credentials creds(conf_.access_key_id, 
conf_.access_key_secret,
+                                                 conf_.security_token);
+            oss_client_ = 
std::make_shared<AlibabaCloud::OSS::OssClient>(conf_.endpoint, creds,
+                                                                         
oss_config);
+            LOG(INFO) << "OSS client created with SIMPLE credentials, 
endpoint=" << conf_.endpoint;
+        } else {
+            LOG(ERROR) << "Unsupported OSS credential provider type";
+            return -1;
+        }
+    } catch (const std::exception& e) {
+        LOG(ERROR) << "Failed to create OSS client: " << e.what();
+        return -1;
+    }
+
+    return 0;
+}
+
+int OSSAccessor::refresh_client_if_needed() {
+    // Credential providers manage their own refresh internally via 
getCredentials().
+    return 0;
+}
+
+std::string OSSAccessor::get_key(const std::string& relative_path) const {
+    std::string path = relative_path;
+    if (!path.empty() && path[0] == '/') {
+        LOG(WARNING) << "OSS relative path should not start with '/': " << 
relative_path;
+        path = path.substr(1);
+    }
+    return conf_.prefix.empty() ? path : conf_.prefix + "/" + path;
+}
+
+std::string OSSAccessor::to_uri(const std::string& relative_path) const {
+    return uri_ + relative_path;
+}
+
+int OSSAccessor::convert_oss_error_code(const std::string& error_code) const {
+    if (error_code == "NoSuchKey" || error_code == "NoSuchBucket") {
+        return 1; // Not found
+    } else if (error_code == "AccessDenied" || error_code == 
"InvalidAccessKeyId") {
+        return -2; // Access denied
+    } else if (error_code == "SecurityTokenExpired") {
+        LOG(WARNING) << "OSS security token expired, will refresh on next 
operation";
+        return -3; // Credentials expired
+    } else if (error_code == "RequestTimeout" || error_code == 
"ConnectionTimeout") {
+        return -4; // Timeout
+    }
+
+    LOG(WARNING) << "OSS operation failed with error code: " << error_code;
+    return -1; // Generic error
+}
+
+std::shared_ptr<AlibabaCloud::OSS::OssClient> OSSAccessor::get_client() const {
+    std::lock_guard<std::mutex> lock(client_mutex_);
+    return oss_client_;
+}
+
+int OSSAccessor::put_file(const std::string& path, const std::string& content) 
{
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_put_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    // Create stream from content
+    std::shared_ptr<std::stringstream> content_stream =
+            std::make_shared<std::stringstream>(content);
+
+    AlibabaCloud::OSS::PutObjectRequest request(conf_.bucket, key, 
content_stream);
+
+    auto outcome = client->PutObject(request);
+    if (!outcome.isSuccess()) {
+        LOG(WARNING) << "OSS PutObject failed: " << outcome.error().Code() << 
" - "
+                     << outcome.error().Message() << ", key: " << key;
+        return convert_oss_error_code(outcome.error().Code());
+    }
+
+    VLOG(1) << "OSS PutObject success: " << key << ", size: " << 
content.size();
+    return 0;
+}
+
+int OSSAccessor::delete_file(const std::string& path) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_delete_object_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    auto outcome = client->DeleteObject(conf_.bucket, key);
+    if (!outcome.isSuccess()) {
+        // OSS DeleteObject returns success even if object doesn't exist
+        // Only log real errors
+        std::string error_code = outcome.error().Code();
+        if (error_code != "NoSuchKey") {
+            LOG(WARNING) << "OSS DeleteObject failed: " << error_code << " - "
+                         << outcome.error().Message() << ", key: " << key;
+            return convert_oss_error_code(error_code);
+        }
+    }
+
+    VLOG(1) << "OSS DeleteObject success: " << key;
+    return 0;
+}
+
+int OSSAccessor::delete_files(const std::vector<std::string>& paths) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_delete_objects_latency);
+
+    if (paths.empty()) {
+        return 0;
+    }
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    // OSS DeleteObjects supports batch delete (max 1000 keys per request)
+    const size_t batch_size = 1000;
+
+    for (size_t i = 0; i < paths.size(); i += batch_size) {
+        size_t end = std::min(i + batch_size, paths.size());
+
+        AlibabaCloud::OSS::DeletedKeyList keys;
+        for (size_t j = i; j < end; ++j) {
+            keys.push_back(get_key(paths[j]));
+        }
+
+        AlibabaCloud::OSS::DeleteObjectsRequest request(conf_.bucket);
+        request.setKeyList(keys);
+
+        auto outcome = client->DeleteObjects(request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS DeleteObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            return convert_oss_error_code(outcome.error().Code());
+        }
+
+        VLOG(1) << "OSS DeleteObjects success: deleted " << 
outcome.result().keyList().size()
+                << " objects (" << (end - i) << " requested)";
+    }
+
+    return 0;
+}
+
+int OSSAccessor::delete_prefix(const std::string& path_prefix, int64_t 
expiration_time) {
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string prefix = get_key(path_prefix);
+
+    // List all objects with prefix and delete them in batches
+    std::vector<std::string> keys_to_delete;
+    const size_t batch_size = 1000;
+
+    bool is_truncated = true;
+    std::string marker;
+
+    while (is_truncated) {
+        AlibabaCloud::OSS::ListObjectsRequest list_request(conf_.bucket);
+        list_request.setPrefix(prefix);
+        list_request.setMaxKeys(1000);
+
+        if (!marker.empty()) {
+            list_request.setMarker(marker);
+        }
+
+        auto outcome = client->ListObjects(list_request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS ListObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            return convert_oss_error_code(outcome.error().Code());
+        }
+
+        const auto& result = outcome.result();
+        const auto& objects = result.ObjectSummarys();
+
+        for (const auto& obj : objects) {
+            // Check expiration time if specified
+            if (expiration_time > 0) {
+                int64_t obj_mtime = 
parse_oss_last_modified(obj.LastModified());
+                if (obj_mtime >= expiration_time) {
+                    continue; // Skip objects newer than expiration time
+                }
+            }
+
+            keys_to_delete.push_back(obj.Key());
+
+            // Delete in batches
+            if (keys_to_delete.size() >= batch_size) {
+                AlibabaCloud::OSS::DeletedKeyList 
batch_keys(keys_to_delete.begin(),
+                                                             
keys_to_delete.end());
+                AlibabaCloud::OSS::DeleteObjectsRequest 
delete_request(conf_.bucket);
+                delete_request.setKeyList(batch_keys);
+
+                auto delete_outcome = client->DeleteObjects(delete_request);
+                if (!delete_outcome.isSuccess()) {
+                    LOG(WARNING) << "OSS DeleteObjects failed: " << 
delete_outcome.error().Code()
+                                 << " - " << delete_outcome.error().Message();
+                    return 
convert_oss_error_code(delete_outcome.error().Code());
+                }
+
+                VLOG(1) << "OSS deleted batch of " << 
delete_outcome.result().keyList().size()
+                        << " objects";
+                keys_to_delete.clear();
+            }
+        }
+
+        is_truncated = result.IsTruncated();
+        marker = result.NextMarker();
+    }
+
+    // Delete remaining keys
+    if (!keys_to_delete.empty()) {
+        AlibabaCloud::OSS::DeletedKeyList batch_keys(keys_to_delete.begin(), 
keys_to_delete.end());
+        AlibabaCloud::OSS::DeleteObjectsRequest delete_request(conf_.bucket);
+        delete_request.setKeyList(batch_keys);
+
+        auto delete_outcome = client->DeleteObjects(delete_request);
+        if (!delete_outcome.isSuccess()) {
+            LOG(WARNING) << "OSS DeleteObjects failed: " << 
delete_outcome.error().Code() << " - "
+                         << delete_outcome.error().Message();
+            return convert_oss_error_code(delete_outcome.error().Code());
+        }
+
+        VLOG(1) << "OSS deleted final batch of " << 
delete_outcome.result().keyList().size()
+                << " objects";
+    }
+
+    return 0;
+}
+
+int OSSAccessor::delete_directory(const std::string& dir_path) {
+    // For OSS, directory is just a prefix, same as delete_prefix
+    return delete_prefix(dir_path, 0);
+}
+
+int OSSAccessor::delete_all(int64_t expiration_time) {
+    // Delete all objects under the prefix
+    return delete_prefix("", expiration_time);
+}
+
+int OSSAccessor::list_directory(const std::string& dir_path, 
std::unique_ptr<ListIterator>* res) {
+    return list_prefix(dir_path, res);
+}
+
+int OSSAccessor::list_all(std::unique_ptr<ListIterator>* res) {
+    return list_prefix("", res);
+}
+
+int OSSAccessor::list_prefix(const std::string& path_prefix, 
std::unique_ptr<ListIterator>* res) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_list_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string prefix = get_key(path_prefix);
+
+    *res = std::make_unique<OSSListIterator>(client, conf_.bucket, prefix,
+                                             conf_.prefix.empty() ? 0 : 
conf_.prefix.length() + 1);
+    return 0;
+}
+
+int OSSAccessor::exists(const std::string& path) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_head_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    // Use DoesObjectExist for efficient check
+    bool exists = client->DoesObjectExist(conf_.bucket, key);
+

Review Comment:
   **MEDIUM — `DoesObjectExist()` can throw exceptions, crashing the recycler**
   
   Unlike other OSSAccessor methods that check `outcome.isSuccess()`, 
`DoesObjectExist()` can throw `ClientException` or `ServerException` on network 
errors. An unhandled exception here would crash the recycler process.
   
   Consider using `HeadObject` with `isSuccess()` check, or wrap in try/catch.



##########
be/src/io/fs/oss_file_writer.cpp:
##########
@@ -0,0 +1,503 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/oss_file_writer.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <bvar/reducer.h>
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <sstream>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/path.h"
+#include "io/fs/s3_file_bufferpool.h"
+#include "runtime/exec_env.h"
+#include "util/debug_points.h"
+
+namespace doris::io {
+
+// Existing metrics
+bvar::Adder<uint64_t> oss_file_writer_total("oss_file_writer_total_num");
+bvar::Adder<uint64_t> oss_bytes_written_total("oss_file_writer_bytes_written");
+bvar::Adder<uint64_t> oss_file_created_total("oss_file_writer_file_created");
+bvar::Adder<uint64_t> 
oss_file_being_written("oss_file_writer_file_being_written");
+
+// New async metrics
+bvar::Adder<int64_t> 
oss_file_writer_async_close_queuing("oss_file_writer_async_close_queuing");
+bvar::Adder<int64_t> oss_file_writer_async_close_processing(
+        "oss_file_writer_async_close_processing");
+
+OSSFileWriter::OSSFileWriter(std::shared_ptr<OSSClientHolder> client, 
std::string bucket,
+                             std::string key, const FileWriterOptions* opts)
+        : _path(fmt::format("oss://{}/{}", bucket, key)),
+          _bucket(std::move(bucket)),
+          _key(std::move(key)),
+          _client(std::move(client)),
+          _buffer_size(config::s3_write_buffer_size),
+          _used_by_oss_committer(opts ? opts->used_by_s3_committer : false) {
+    oss_file_writer_total << 1;
+    oss_file_being_written << 1;
+
+    _completed_parts.reserve(100);
+
+    init_cache_builder(opts, _path);
+}
+
+OSSFileWriter::~OSSFileWriter() {
+    // Wait for any pending async operations to complete
+    _wait_until_finish("~OSSFileWriter");
+

Review Comment:
   **HIGH — Potential use-after-free: destructor doesn't wait on async close 
future**
   
   The S3 writer's destructor checks `_async_close_pack != nullptr` and calls 
`_async_close_pack->future.get()` to wait for the async close thread pool task 
to complete. This OSS destructor only calls `_wait_until_finish()` which waits 
on `_countdown_event`, but does NOT wait on the async close future.
   
   If `close(true)` (non-blocking) was called and then the writer is destroyed 
before the async close completes, the thread pool task will access a destroyed 
object. Please add the same async-close-pack wait as S3:
   ```cpp
   if (_async_close_pack != nullptr) {
       std::ignore = _async_close_pack->future.get();
       _async_close_pack = nullptr;
   } else {
       _wait_until_finish(...);
   }
   ```



##########
fe/fe-core/src/test/java/org/apache/doris/catalog/OSSStorageVaultTest.java:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Unit tests for OSSStorageVault.checkCreationProperties() validation logic.
+ *
+ * Mirrors the validation test patterns used in S3StorageVault tests.
+ * Each test targets one specific validation rule in checkCreationProperties().
+ */
+public class OSSStorageVaultTest {
+
+    // 
---------------------------------------------------------------------------
+    // Helper: build a minimal valid property map so individual tests can 
remove
+    // or override only the property they care about.
+    // 
---------------------------------------------------------------------------
+    private static Map<String, String> validProps() {
+        Map<String, String> props = new HashMap<>();

Review Comment:
   **CRITICAL — Test bug: `validProps()` missing `"type"` key — ALL tests will 
fail**
   
   `OSSStorageVault.checkCreationProperties()` calls 
`super.checkCreationProperties(properties)` first, which at 
`StorageVault.java:200` does:
   ```java
   Preconditions.checkArgument(type != null, "Missing property " + 
PropertyKey.TYPE);
   ```
   
   Since `validProps()` does not include a `"type"` key, every test that calls 
`check(validProps())` will throw `IllegalArgumentException` from the base class 
before reaching any OSS-specific validation.
   
   Fix: Add `props.put("type", "oss");` here.



##########
build.sh:
##########
@@ -497,6 +498,10 @@ else
     BUILD_JINDOFS='ON'
 fi
 
+if [[ -n "${DISABLE_BUILD_OSS}" ]]; then
+    BUILD_OSS='OFF'
+fi

Review Comment:
   **HIGH — `BUILD_OSS` is never passed to cmake for BE**
   
   The `BUILD_OSS` variable is set here but is never passed as 
`-DBUILD_OSS="${BUILD_OSS}"` in the cmake invocation for BE (around line 
667-698 in the original file). The BE cmake invocation includes 
`-DBUILD_BENCHMARK`, `-DBUILD_AZURE`, etc., but not `-DBUILD_OSS`.
   
   This means `DISABLE_BUILD_OSS` has no effect on BE compilation — BE cmake 
always uses its default of `ON`.
   
   Fix: Add `-DBUILD_OSS="${BUILD_OSS}"` to the BE cmake command alongside the 
other BUILD flags.



##########
cloud/src/recycler/oss_accessor.cpp:
##########
@@ -0,0 +1,640 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/oss_accessor.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <alibabacloud/oss/client/ClientConfiguration.h>
+#include <bvar/reducer.h>
+#include <gen_cpp/cloud.pb.h>
+#include <time.h>
+
+#include <iomanip>
+#include <memory>
+#include <sstream>
+#include <stdexcept>
+#include <utility>
+
+#include "common/config.h"
+#include "common/encryption_util.h"
+#include "common/logging.h"
+#include "common/stopwatch.h"
+#include "common/string_util.h"
+#include "common/util.h"
+#include "cpp/aws_common.h"
+#include "cpp/oss_credential_provider.h"
+#include "recycler/util.h"
+
+namespace doris::cloud {
+
+namespace oss_bvar {
+bvar::LatencyRecorder oss_get_latency("oss_get");
+bvar::LatencyRecorder oss_put_latency("oss_put");
+bvar::LatencyRecorder oss_delete_object_latency("oss_delete_object");
+bvar::LatencyRecorder oss_delete_objects_latency("oss_delete_objects");
+bvar::LatencyRecorder oss_head_latency("oss_head");
+bvar::LatencyRecorder oss_list_latency("oss_list");
+} // namespace oss_bvar
+
+// Parse OSS LastModified (ISO 8601 UTC) to Unix timestamp
+static int64_t parse_oss_last_modified(const std::string& last_modified_str) {
+    std::tm tm = {};
+    std::istringstream ss(last_modified_str);
+    ss >> std::get_time(&tm, "%Y-%m-%dT%H:%M:%S");
+
+    if (ss.fail()) {
+        LOG(WARNING) << "Failed to parse OSS LastModified: " << 
last_modified_str;
+        return 0;
+    }
+
+    return static_cast<int64_t>(timegm(&tm)); // timegm() for UTC time
+}
+
+// OSS List Iterator implementation
+class OSSListIterator final : public ListIterator {
+public:
+    OSSListIterator(std::shared_ptr<AlibabaCloud::OSS::OssClient> client, 
const std::string& bucket,
+                    const std::string& prefix, size_t prefix_length)
+            : client_(std::move(client)),
+              bucket_(bucket),
+              prefix_(prefix),
+              prefix_length_(prefix_length),
+              is_truncated_(true),
+              current_index_(0) {
+        // Fetch first batch
+        fetch_next_batch();
+    }
+
+    ~OSSListIterator() override = default;
+
+    bool is_valid() override { return valid_; }
+
+    bool has_next() override {
+        if (current_index_ < objects_.size()) {
+            return true;
+        }
+
+        // Need to fetch next batch
+        if (is_truncated_) {
+            fetch_next_batch();
+            return current_index_ < objects_.size();
+        }
+
+        return false;
+    }
+
+    std::optional<FileMeta> next() override {
+        if (!has_next()) {
+            return std::nullopt;
+        }
+
+        const auto& obj = objects_[current_index_++];
+        return FileMeta {.path = get_relative_path(obj.Key()),
+                         .size = obj.Size(),
+                         .mtime_s = 
parse_oss_last_modified(obj.LastModified())};
+    }
+
+private:
+    void fetch_next_batch() {
+        AlibabaCloud::OSS::ListObjectsRequest request(bucket_);
+        request.setPrefix(prefix_);
+        request.setMaxKeys(1000); // OSS default max keys per request
+
+        if (!next_marker_.empty()) {
+            request.setMarker(next_marker_);
+        }
+
+        auto outcome = client_->ListObjects(request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS ListObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            valid_ = false;
+            is_truncated_ = false;
+            return;
+        }
+
+        const auto& result = outcome.result();
+        objects_ = result.ObjectSummarys();
+        is_truncated_ = result.IsTruncated();
+        next_marker_ = result.NextMarker();
+        current_index_ = 0;
+        valid_ = true;
+    }
+
+    std::string get_relative_path(const std::string& key) const {
+        if (key.length() >= prefix_length_) {
+            return key.substr(prefix_length_);
+        }
+        return key;
+    }
+
+    std::shared_ptr<AlibabaCloud::OSS::OssClient> client_;
+    std::string bucket_;
+    std::string prefix_;
+    size_t prefix_length_;
+    bool valid_ {false};
+    bool is_truncated_;
+    std::string next_marker_;
+    AlibabaCloud::OSS::ObjectSummaryList objects_;
+    size_t current_index_;
+};
+
+// OSSConf implementation
+
+std::optional<OSSConf> OSSConf::from_obj_store_info(const ObjectStoreInfoPB& 
obj_info,
+                                                    bool skip_aksk) {
+    // Only process OSS provider
+    if (obj_info.provider() != ObjectStoreInfoPB_Provider_OSS) {
+        return std::nullopt;
+    }
+
+    OSSConf conf;
+    conf.endpoint = normalize_oss_endpoint(obj_info.endpoint());
+    conf.bucket = obj_info.bucket();
+    conf.prefix = obj_info.prefix();
+    conf.region = obj_info.region();
+
+    if (!skip_aksk) {
+        if (obj_info.has_cred_provider_type()) {
+            switch (obj_info.cred_provider_type()) {
+            case CredProviderTypePB::INSTANCE_PROFILE:
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+                break;
+            case CredProviderTypePB::DEFAULT:
+                conf.provider_type = OSSCredProviderType::DEFAULT;
+                break;
+            case CredProviderTypePB::SIMPLE:
+                conf.provider_type = OSSCredProviderType::SIMPLE;
+                if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+                    if (obj_info.has_encryption_info()) {
+                        AkSkPair plain;
+                        int ret = decrypt_ak_sk_helper(obj_info.ak(), 
obj_info.sk(),
+                                                       
obj_info.encryption_info(), &plain);
+                        if (ret != 0) {
+                            LOG(WARNING) << "Failed to decrypt OSS ak/sk";
+                            return std::nullopt;
+                        }
+                        conf.access_key_id = std::move(plain.first);
+                        conf.access_key_secret = std::move(plain.second);
+                    } else {
+                        conf.access_key_id = obj_info.ak();
+                        conf.access_key_secret = obj_info.sk();
+                    }
+                }
+                break;
+            default:
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+                break;
+            }
+        } else {
+            if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+                conf.provider_type = OSSCredProviderType::SIMPLE;
+                if (obj_info.has_encryption_info()) {
+                    AkSkPair plain;
+                    int ret = decrypt_ak_sk_helper(obj_info.ak(), 
obj_info.sk(),
+                                                   obj_info.encryption_info(), 
&plain);
+                    if (ret != 0) {
+                        LOG(WARNING) << "Failed to decrypt OSS ak/sk";
+                        return std::nullopt;
+                    }
+                    conf.access_key_id = std::move(plain.first);
+                    conf.access_key_secret = std::move(plain.second);
+                } else {
+                    conf.access_key_id = obj_info.ak();
+                    conf.access_key_secret = obj_info.sk();
+                }
+            } else {
+                conf.provider_type = OSSCredProviderType::INSTANCE_PROFILE;
+            }
+        }
+
+        if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) {
+            conf.role_arn = obj_info.role_arn();
+            conf.external_id = obj_info.external_id();
+        }
+    }
+
+    return conf;
+}
+
+uint64_t OSSConf::get_hash() const {
+    std::string hash_str = endpoint + bucket + prefix + region + access_key_id 
+ access_key_secret +
+                           role_arn + external_id + 
std::to_string(static_cast<int>(provider_type));
+    return std::hash<std::string> {}(hash_str);
+}
+
+// OSSAccessor implementation
+
+OSSAccessor::OSSAccessor(OSSConf conf)
+        : StorageVaultAccessor(AccessorType::OSS), conf_(std::move(conf)) {
+    uri_ = fmt::format("oss://{}/{}", conf_.bucket, conf_.prefix.empty() ? "" 
: conf_.prefix + "/");
+}
+
+OSSAccessor::~OSSAccessor() = default;
+
+int OSSAccessor::create(OSSConf conf, std::shared_ptr<OSSAccessor>* accessor) {
+    *accessor = std::make_shared<OSSAccessor>(std::move(conf));
+    return (*accessor)->init();
+}
+
+int OSSAccessor::init() {
+    static std::once_flag init_flag;
+    std::call_once(init_flag, []() {
+        AlibabaCloud::OSS::InitializeSdk();
+        LOG(INFO) << "Alibaba Cloud OSS SDK initialized";
+    });
+    _ca_cert_file_path =
+            
get_valid_ca_cert_path(doris::cloud::split(config::ca_cert_file_paths, ';'));
+    return create_oss_client();
+}
+
+int OSSAccessor::create_oss_client() {
+    std::lock_guard<std::mutex> lock(client_mutex_);
+
+    AlibabaCloud::OSS::ClientConfiguration oss_config;
+    oss_config.maxConnections = conf_.max_connections;
+    oss_config.connectTimeoutMs = conf_.connect_timeout_ms;
+    oss_config.requestTimeoutMs = conf_.request_timeout_ms;
+
+    if (_ca_cert_file_path.empty()) {
+        _ca_cert_file_path =
+                
get_valid_ca_cert_path(doris::cloud::split(config::ca_cert_file_paths, ';'));
+    }
+    if (!_ca_cert_file_path.empty()) {
+        oss_config.caFile = _ca_cert_file_path;
+    }
+
+    try {
+        if (conf_.provider_type == OSSCredProviderType::INSTANCE_PROFILE) {
+            if (!conf_.role_arn.empty()) {
+                if (!sts_credential_provider_) {
+                    std::string region = conf_.region.empty() ? "cn-hangzhou" 
: conf_.region;
+                    sts_credential_provider_ = 
std::make_shared<OSSSTSCredentialProvider>(
+                            conf_.role_arn, region, conf_.external_id);
+                }
+                oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                        conf_.endpoint,
+                        
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                                sts_credential_provider_),
+                        oss_config);
+                LOG(INFO) << "OSS client created with AssumeRole, endpoint=" 
<< conf_.endpoint
+                          << ", role_arn=" << conf_.role_arn;
+            } else {
+                if (!credentials_provider_) {
+                    credentials_provider_ = 
std::make_shared<ECSMetadataCredentialsProvider>();
+                }
+                oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                        conf_.endpoint,
+                        
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                                credentials_provider_),
+                        oss_config);
+                LOG(INFO) << "OSS client created with INSTANCE_PROFILE, 
endpoint="
+                          << conf_.endpoint;
+            }
+        } else if (conf_.provider_type == OSSCredProviderType::DEFAULT) {
+            if (!default_credential_provider_) {
+                default_credential_provider_ = 
std::make_shared<OSSDefaultCredentialsProvider>();
+            }
+            oss_client_ = std::make_shared<AlibabaCloud::OSS::OssClient>(
+                    conf_.endpoint,
+                    
std::static_pointer_cast<AlibabaCloud::OSS::CredentialsProvider>(
+                            default_credential_provider_),
+                    oss_config);
+            LOG(INFO) << "OSS client created with DEFAULT provider, endpoint=" 
<< conf_.endpoint;
+        } else if (conf_.provider_type == OSSCredProviderType::SIMPLE) {
+            AlibabaCloud::OSS::Credentials creds(conf_.access_key_id, 
conf_.access_key_secret,
+                                                 conf_.security_token);
+            oss_client_ = 
std::make_shared<AlibabaCloud::OSS::OssClient>(conf_.endpoint, creds,
+                                                                         
oss_config);
+            LOG(INFO) << "OSS client created with SIMPLE credentials, 
endpoint=" << conf_.endpoint;
+        } else {
+            LOG(ERROR) << "Unsupported OSS credential provider type";
+            return -1;
+        }
+    } catch (const std::exception& e) {
+        LOG(ERROR) << "Failed to create OSS client: " << e.what();
+        return -1;
+    }
+
+    return 0;
+}
+
+int OSSAccessor::refresh_client_if_needed() {
+    // Credential providers manage their own refresh internally via 
getCredentials().
+    return 0;
+}
+
+std::string OSSAccessor::get_key(const std::string& relative_path) const {
+    std::string path = relative_path;
+    if (!path.empty() && path[0] == '/') {
+        LOG(WARNING) << "OSS relative path should not start with '/': " << 
relative_path;
+        path = path.substr(1);
+    }
+    return conf_.prefix.empty() ? path : conf_.prefix + "/" + path;
+}
+
+std::string OSSAccessor::to_uri(const std::string& relative_path) const {
+    return uri_ + relative_path;
+}
+
+int OSSAccessor::convert_oss_error_code(const std::string& error_code) const {
+    if (error_code == "NoSuchKey" || error_code == "NoSuchBucket") {
+        return 1; // Not found
+    } else if (error_code == "AccessDenied" || error_code == 
"InvalidAccessKeyId") {
+        return -2; // Access denied
+    } else if (error_code == "SecurityTokenExpired") {
+        LOG(WARNING) << "OSS security token expired, will refresh on next 
operation";
+        return -3; // Credentials expired
+    } else if (error_code == "RequestTimeout" || error_code == 
"ConnectionTimeout") {
+        return -4; // Timeout
+    }
+
+    LOG(WARNING) << "OSS operation failed with error code: " << error_code;
+    return -1; // Generic error
+}
+
+std::shared_ptr<AlibabaCloud::OSS::OssClient> OSSAccessor::get_client() const {
+    std::lock_guard<std::mutex> lock(client_mutex_);
+    return oss_client_;
+}
+
+int OSSAccessor::put_file(const std::string& path, const std::string& content) 
{
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_put_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    // Create stream from content
+    std::shared_ptr<std::stringstream> content_stream =
+            std::make_shared<std::stringstream>(content);
+
+    AlibabaCloud::OSS::PutObjectRequest request(conf_.bucket, key, 
content_stream);
+
+    auto outcome = client->PutObject(request);
+    if (!outcome.isSuccess()) {
+        LOG(WARNING) << "OSS PutObject failed: " << outcome.error().Code() << 
" - "
+                     << outcome.error().Message() << ", key: " << key;
+        return convert_oss_error_code(outcome.error().Code());
+    }
+
+    VLOG(1) << "OSS PutObject success: " << key << ", size: " << 
content.size();
+    return 0;
+}
+
+int OSSAccessor::delete_file(const std::string& path) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_delete_object_latency);
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string key = get_key(path);
+
+    auto outcome = client->DeleteObject(conf_.bucket, key);
+    if (!outcome.isSuccess()) {
+        // OSS DeleteObject returns success even if object doesn't exist
+        // Only log real errors
+        std::string error_code = outcome.error().Code();
+        if (error_code != "NoSuchKey") {
+            LOG(WARNING) << "OSS DeleteObject failed: " << error_code << " - "
+                         << outcome.error().Message() << ", key: " << key;
+            return convert_oss_error_code(error_code);
+        }
+    }
+
+    VLOG(1) << "OSS DeleteObject success: " << key;
+    return 0;
+}
+
+int OSSAccessor::delete_files(const std::vector<std::string>& paths) {
+    SCOPED_BVAR_LATENCY(oss_bvar::oss_delete_objects_latency);
+
+    if (paths.empty()) {
+        return 0;
+    }
+
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    // OSS DeleteObjects supports batch delete (max 1000 keys per request)
+    const size_t batch_size = 1000;
+
+    for (size_t i = 0; i < paths.size(); i += batch_size) {
+        size_t end = std::min(i + batch_size, paths.size());
+
+        AlibabaCloud::OSS::DeletedKeyList keys;
+        for (size_t j = i; j < end; ++j) {
+            keys.push_back(get_key(paths[j]));
+        }
+
+        AlibabaCloud::OSS::DeleteObjectsRequest request(conf_.bucket);
+        request.setKeyList(keys);
+
+        auto outcome = client->DeleteObjects(request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS DeleteObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            return convert_oss_error_code(outcome.error().Code());
+        }
+
+        VLOG(1) << "OSS DeleteObjects success: deleted " << 
outcome.result().keyList().size()
+                << " objects (" << (end - i) << " requested)";
+    }
+
+    return 0;
+}
+
+int OSSAccessor::delete_prefix(const std::string& path_prefix, int64_t 
expiration_time) {
+    int ret = refresh_client_if_needed();
+    if (ret != 0) {
+        return ret;
+    }
+
+    auto client = get_client();
+
+    std::string prefix = get_key(path_prefix);
+
+    // List all objects with prefix and delete them in batches
+    std::vector<std::string> keys_to_delete;
+    const size_t batch_size = 1000;
+
+    bool is_truncated = true;
+    std::string marker;
+
+    while (is_truncated) {
+        AlibabaCloud::OSS::ListObjectsRequest list_request(conf_.bucket);
+        list_request.setPrefix(prefix);
+        list_request.setMaxKeys(1000);
+
+        if (!marker.empty()) {
+            list_request.setMarker(marker);
+        }
+
+        auto outcome = client->ListObjects(list_request);
+        if (!outcome.isSuccess()) {
+            LOG(WARNING) << "OSS ListObjects failed: " << 
outcome.error().Code() << " - "
+                         << outcome.error().Message();
+            return convert_oss_error_code(outcome.error().Code());
+        }
+
+        const auto& result = outcome.result();
+        const auto& objects = result.ObjectSummarys();
+
+        for (const auto& obj : objects) {
+            // Check expiration time if specified
+            if (expiration_time > 0) {
+                int64_t obj_mtime = 
parse_oss_last_modified(obj.LastModified());
+                if (obj_mtime >= expiration_time) {
+                    continue; // Skip objects newer than expiration time
+                }
+            }
+
+            keys_to_delete.push_back(obj.Key());
+
+            // Delete in batches
+            if (keys_to_delete.size() >= batch_size) {
+                AlibabaCloud::OSS::DeletedKeyList 
batch_keys(keys_to_delete.begin(),
+                                                             
keys_to_delete.end());
+                AlibabaCloud::OSS::DeleteObjectsRequest 
delete_request(conf_.bucket);
+                delete_request.setKeyList(batch_keys);
+
+                auto delete_outcome = client->DeleteObjects(delete_request);
+                if (!delete_outcome.isSuccess()) {
+                    LOG(WARNING) << "OSS DeleteObjects failed: " << 
delete_outcome.error().Code()
+                                 << " - " << delete_outcome.error().Message();
+                    return 
convert_oss_error_code(delete_outcome.error().Code());
+                }
+
+                VLOG(1) << "OSS deleted batch of " << 
delete_outcome.result().keyList().size()
+                        << " objects";
+                keys_to_delete.clear();
+            }
+        }
+
+        is_truncated = result.IsTruncated();
+        marker = result.NextMarker();
+    }
+
+    // Delete remaining keys
+    if (!keys_to_delete.empty()) {
+        AlibabaCloud::OSS::DeletedKeyList batch_keys(keys_to_delete.begin(), 
keys_to_delete.end());
+        AlibabaCloud::OSS::DeleteObjectsRequest delete_request(conf_.bucket);
+        delete_request.setKeyList(batch_keys);
+
+        auto delete_outcome = client->DeleteObjects(delete_request);
+        if (!delete_outcome.isSuccess()) {
+            LOG(WARNING) << "OSS DeleteObjects failed: " << 
delete_outcome.error().Code() << " - "
+                         << delete_outcome.error().Message();
+            return convert_oss_error_code(delete_outcome.error().Code());
+        }
+
+        VLOG(1) << "OSS deleted final batch of " << 
delete_outcome.result().keyList().size()
+                << " objects";
+    }
+
+    return 0;
+}
+
+int OSSAccessor::delete_directory(const std::string& dir_path) {
+    // For OSS, directory is just a prefix, same as delete_prefix
+    return delete_prefix(dir_path, 0);
+}

Review Comment:
   **HIGH — Missing empty-path guard: `delete_directory("")` will delete all 
objects**
   
   Unlike `S3Accessor::delete_directory()` which rejects empty `dir_path` with 
`return -1`, this directly delegates to `delete_prefix(dir_path, 0)`. When 
called with an empty string, this behaves identically to `delete_all(0)` — 
deleting all objects under the configured prefix with no expiration filter.
   
   The test at `oss_accessor_test.cpp:61-62` asserts `ASSERT_NE(ret, 0)` for 
`delete_directory("")`, so this test will fail.
   
   Fix: Add the same empty-path validation and trailing-slash normalization as 
S3Accessor.



##########
common/cpp/oss_credential_provider.cpp:
##########
@@ -0,0 +1,456 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cpp/oss_credential_provider.h"
+
+#ifdef USE_OSS
+
+#include <alibabacloud/oss/auth/Credentials.h>
+#include <alibabacloud/oss/auth/CredentialsProvider.h>
+#include <curl/curl.h>
+#include <rapidjson/document.h>
+#include <time.h>
+
+#include <alibabacloud/Sts20150401.hpp>
+#include <alibabacloud/credentials/Client.hpp>
+#include <alibabacloud/utils/models/Config.hpp>
+#include <darabonba/Runtime.hpp>
+#include <iomanip>
+#include <sstream>
+#include <stdexcept>
+
+#include "common/logging.h"
+
+namespace {
+std::string mask_credential(const std::string& cred) {
+    if (cred.empty()) return "";
+    size_t len = cred.length();
+    if (len <= 8) {
+        if (len <= 4) return std::string(len, '*');
+        return cred.substr(0, 2) + std::string(len - 4, '*') + cred.substr(len 
- 2);
+    }
+    return cred.substr(0, 4) + std::string(len - 8, '*') + cred.substr(len - 
4);
+}
+} // namespace
+
+namespace doris {
+
+static size_t curl_write_callback(void* contents, size_t size, size_t nmemb, 
std::string* userp) {
+    size_t total_size = size * nmemb;
+    userp->append(static_cast<char*>(contents), total_size);
+    return total_size;
+}
+
+// ---- ECSMetadataCredentialsProvider ----
+
+ECSMetadataCredentialsProvider::ECSMetadataCredentialsProvider()
+        : _cached_credentials(nullptr), 
_expiration(std::chrono::system_clock::now()) {
+    LOG(INFO) << "ECSMetadataCredentialsProvider initialized";
+}
+
+bool ECSMetadataCredentialsProvider::_is_expired() const {
+    auto now = std::chrono::system_clock::now();
+    return std::chrono::duration_cast<std::chrono::seconds>(_expiration - 
now).count() <=
+           REFRESH_BEFORE_EXPIRY_SECONDS;
+}
+
+AlibabaCloud::OSS::Credentials 
ECSMetadataCredentialsProvider::getCredentials() {
+    {
+        std::lock_guard<std::mutex> lock(_mtx);
+        if (_cached_credentials && !_is_expired()) {
+            VLOG(2) << "Returning cached ECS metadata credentials";
+            return *_cached_credentials;
+        }
+        if (_cached_credentials) {
+            auto t = std::chrono::system_clock::to_time_t(_expiration);
+            struct tm tm_buf;
+            LOG(INFO) << "ECS metadata credentials expiring ("
+                      << std::put_time(localtime_r(&t, &tm_buf), "%Y-%m-%d 
%H:%M:%S")
+                      << "), refreshing";
+        } else {
+            LOG(INFO) << "Fetching ECS metadata credentials (first time)";
+        }
+    }
+
+    std::unique_ptr<AlibabaCloud::OSS::Credentials> new_credentials;
+    std::chrono::system_clock::time_point new_expiration;
+
+    if (_fetch_credentials_outside_lock(new_credentials, new_expiration) != 0) 
{
+        std::lock_guard<std::mutex> lock(_mtx);
+        if (_cached_credentials) {
+            LOG(WARNING) << "Using expired ECS metadata credentials as 
fallback";
+            return *_cached_credentials;
+        }
+        LOG(ERROR) << "Failed to fetch credentials from ECS metadata service 
and no cached "
+                      "fallback available";
+        return AlibabaCloud::OSS::Credentials("", "", "");
+    }
+
+    {
+        std::lock_guard<std::mutex> lock(_mtx);
+        if (_cached_credentials && !_is_expired()) {
+            return *_cached_credentials;
+        }
+        _cached_credentials = std::move(new_credentials);
+        _expiration = new_expiration;
+        auto t = std::chrono::system_clock::to_time_t(_expiration);
+        struct tm tm_buf;
+        LOG(INFO) << "ECS metadata credentials refreshed, expiry: "
+                  << std::put_time(localtime_r(&t, &tm_buf), "%Y-%m-%d 
%H:%M:%S");
+        return *_cached_credentials;
+    }
+}
+
+int ECSMetadataCredentialsProvider::_http_get(const std::string& url, 
std::string& response) {
+    CURL* curl = curl_easy_init();
+    if (!curl) {
+        LOG(ERROR) << "Failed to initialize CURL";
+        return -1;
+    }
+
+    response.clear();
+    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_callback);
+    curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
+    curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, METADATA_SERVICE_TIMEOUT_MS);
+    curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
+
+    CURLcode res = curl_easy_perform(curl);
+    if (res != CURLE_OK) {
+        LOG(ERROR) << "ECS metadata HTTP GET failed: " << 
curl_easy_strerror(res);
+        curl_easy_cleanup(curl);
+        return -1;
+    }
+
+    long http_code = 0;
+    curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
+    curl_easy_cleanup(curl);
+
+    if (http_code != 200) {
+        LOG(ERROR) << "ECS metadata service returned HTTP " << http_code;
+        return -1;
+    }
+    return 0;
+}
+
+int ECSMetadataCredentialsProvider::_get_role_name(std::string& role_name) {
+    std::string url = std::string("http://";) + METADATA_SERVICE_HOST + 
METADATA_SERVICE_PATH;
+    std::string response;
+    if (_http_get(url, response) != 0) {
+        return -1;
+    }
+
+    role_name = response;
+    role_name.erase(role_name.begin(),
+                    std::find_if(role_name.begin(), role_name.end(),
+                                 [](unsigned char ch) { return 
!std::isspace(ch); }));
+    role_name.erase(std::find_if(role_name.rbegin(), role_name.rend(),
+                                 [](unsigned char ch) { return 
!std::isspace(ch); })
+                            .base(),
+                    role_name.end());
+
+    if (role_name.empty()) {
+        LOG(ERROR) << "No RAM role attached to this ECS instance";
+        return -1;
+    }
+
+    size_t newline_pos = role_name.find('\n');
+    if (newline_pos != std::string::npos) {
+        std::string all_roles = role_name;
+        role_name = role_name.substr(0, newline_pos);
+        LOG(WARNING) << "Multiple RAM roles found, using first: " << role_name
+                     << " (all: " << all_roles << ")";
+    }
+
+    LOG(INFO) << "ECS RAM role: " << role_name;
+    return 0;
+}
+
+int ECSMetadataCredentialsProvider::_fetch_credentials_outside_lock(
+        std::unique_ptr<AlibabaCloud::OSS::Credentials>& out_credentials,
+        std::chrono::system_clock::time_point& out_expiration) {
+    std::string role_name;
+    if (_get_role_name(role_name) != 0) {
+        return -1;
+    }
+
+    std::string url =
+            std::string("http://";) + METADATA_SERVICE_HOST + 
METADATA_SERVICE_PATH + role_name;
+    std::string response;
+    if (_http_get(url, response) != 0) {
+        return -1;
+    }
+
+    rapidjson::Document doc;
+    doc.Parse(response.c_str());
+
+    if (doc.HasParseError()) {
+        LOG(ERROR) << "Failed to parse ECS metadata JSON response";
+        return -1;
+    }
+    if (!doc.HasMember("Code") || std::string(doc["Code"].GetString()) != 
"Success") {
+        LOG(ERROR) << "ECS metadata error: "
+                   << (doc.HasMember("Message") ? doc["Message"].GetString() : 
"unknown");
+        return -1;
+    }
+    if (!doc.HasMember("AccessKeyId") || !doc.HasMember("AccessKeySecret") ||
+        !doc.HasMember("SecurityToken") || !doc.HasMember("Expiration")) {
+        LOG(ERROR) << "ECS metadata response missing required fields";
+        return -1;
+    }
+
+    std::string ak = doc["AccessKeyId"].GetString();
+    std::string sk = doc["AccessKeySecret"].GetString();
+    std::string token = doc["SecurityToken"].GetString();
+    std::string expiry_str = doc["Expiration"].GetString();
+
+    if (ak.empty() || sk.empty() || token.empty()) {
+        LOG(ERROR) << "ECS metadata returned empty credentials";
+        return -1;
+    }
+
+    std::tm tm = {};
+    std::istringstream ss(expiry_str);
+    ss >> std::get_time(&tm, "%Y-%m-%dT%H:%M:%SZ");
+    if (ss.fail()) {
+        LOG(ERROR) << "Failed to parse expiration from ECS metadata: " << 
expiry_str;
+        return -1;
+    }
+
+    out_expiration = std::chrono::system_clock::from_time_t(timegm(&tm));
+    out_credentials = std::make_unique<AlibabaCloud::OSS::Credentials>(ak, sk, 
token);
+    VLOG(1) << "ECS metadata credentials: ak=" << mask_credential(ak) << ", 
expiry=" << expiry_str;
+    return 0;
+}
+
+// ---- OSSSTSCredentialProvider ----
+
+OSSSTSCredentialProvider::OSSSTSCredentialProvider(const std::string& role_arn,
+                                                   const std::string& region,
+                                                   const std::string& 
external_id)
+        : _cached_credentials(nullptr),
+          _expiration(std::chrono::system_clock::now()),
+          _role_arn(role_arn),
+          _region(region),
+          _external_id(external_id) {
+    if (_role_arn.empty()) {
+        throw std::invalid_argument("RAM role ARN cannot be empty for STS 
AssumeRole");
+    }
+    LOG(INFO) << "OSSSTSCredentialProvider: role_arn=" << _role_arn << ", 
region=" << _region
+              << ", external_id="
+              << (_external_id.empty() ? "(none)" : 
mask_credential(_external_id));
+}
+
+bool OSSSTSCredentialProvider::_is_expired() const {
+    auto now = std::chrono::system_clock::now();
+    return std::chrono::duration_cast<std::chrono::seconds>(_expiration - 
now).count() <=
+           REFRESH_BEFORE_EXPIRY_SECONDS;
+}
+
+AlibabaCloud::OSS::Credentials OSSSTSCredentialProvider::getCredentials() {
+    {
+        std::lock_guard<std::mutex> lock(_mtx);
+        if (_cached_credentials && !_is_expired()) {
+            VLOG(2) << "Returning cached STS AssumeRole credentials";
+            return *_cached_credentials;
+        }
+        if (_cached_credentials) {
+            auto t = std::chrono::system_clock::to_time_t(_expiration);
+            struct tm tm_buf;
+            LOG(INFO) << "STS credentials expiring ("
+                      << std::put_time(localtime_r(&t, &tm_buf), "%Y-%m-%d 
%H:%M:%S")
+                      << "), refreshing";
+        } else {
+            LOG(INFO) << "Fetching STS AssumeRole credentials (first time)";
+        }
+    }
+
+    std::unique_ptr<AlibabaCloud::OSS::Credentials> new_credentials;
+    std::chrono::system_clock::time_point new_expiration;
+
+    if (_fetch_credentials_from_sts(new_credentials, new_expiration) != 0) {
+        std::lock_guard<std::mutex> lock(_mtx);
+        if (_cached_credentials) {
+            LOG(WARNING) << "Using expired STS credentials as fallback";
+            return *_cached_credentials;
+        }
+        LOG(ERROR) << "Failed to fetch STS AssumeRole credentials and no 
cached fallback available";
+        return AlibabaCloud::OSS::Credentials("", "", "");
+    }
+
+    {
+        std::lock_guard<std::mutex> lock(_mtx);
+        if (_cached_credentials && !_is_expired()) {
+            return *_cached_credentials;
+        }
+        _cached_credentials = std::move(new_credentials);
+        _expiration = new_expiration;
+        auto t = std::chrono::system_clock::to_time_t(_expiration);
+        struct tm tm_buf;
+        LOG(INFO) << "STS AssumeRole credentials refreshed, expiry: "
+                  << std::put_time(localtime_r(&t, &tm_buf), "%Y-%m-%d 
%H:%M:%S");
+        return *_cached_credentials;
+    }
+}
+
+int OSSSTSCredentialProvider::_fetch_credentials_from_sts(
+        std::unique_ptr<AlibabaCloud::OSS::Credentials>& out_credentials,
+        std::chrono::system_clock::time_point& out_expiration) {
+    try {
+        AlibabaCloud::Credentials::Models::Config cred_config;
+        cred_config.setType("ecs_ram_role");
+        AlibabaCloud::Credentials::Client cred_client(cred_config);
+        AlibabaCloud::Credentials::Models::CredentialModel base_cred = 
cred_client.getCredential();
+        LOG(INFO) << "STS AssumeRole base credentials from provider: "
+                  << base_cred.getProviderName();
+
+        AlibabaCloud::OpenApi::Utils::Models::Config config;
+        config.setAccessKeyId(base_cred.getAccessKeyId());
+        config.setAccessKeySecret(base_cred.getAccessKeySecret());
+        if (!base_cred.getSecurityToken().empty()) {
+            config.setSecurityToken(base_cred.getSecurityToken());
+        }
+        config.setRegionId(_region);
+        config.setEndpoint("sts." + _region + ".aliyuncs.com");
+
+        AlibabaCloud::Sts20150401::Client client(config);
+
+        AlibabaCloud::Sts20150401::Models::AssumeRoleRequest request;
+        request.setRoleArn(_role_arn);
+        request.setRoleSessionName("doris-oss-session");
+        request.setDurationSeconds(SESSION_DURATION_SECONDS);
+        if (!_external_id.empty()) {
+            request.setExternalId(_external_id);
+        }
+
+        Darabonba::RuntimeOptions runtime;
+        runtime.setIgnoreSSL(true);
+

Review Comment:
   **CRITICAL — Security: TLS verification disabled for STS credential 
exchange**
   
   ```cpp
   runtime.setIgnoreSSL(true);
   ```
   
   This unconditionally disables SSL certificate verification for the STS 
`AssumeRole` call, which transmits highly sensitive temporary credentials 
(AccessKeyId, AccessKeySecret, SecurityToken). A man-in-the-middle attack could 
intercept and steal these credentials.
   
   This must not remain in production code. Use the CA certificate 
configuration (similar to `OSSClientFactory`'s `_ca_cert_file_path` logic) to 
properly configure TLS, or at minimum make this configurable rather than always 
disabled.



##########
be/src/io/fs/oss_file_system.cpp:
##########
@@ -0,0 +1,623 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/oss_file_system.h"
+
+#include <alibabacloud/oss/OssClient.h>
+#include <fmt/format.h>
+
+#include <chrono>
+#include <filesystem>
+#include <fstream>
+#include <memory>
+#include <thread>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "io/fs/err_utils.h"
+#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/oss_file_reader.h"
+#include "io/fs/oss_file_writer.h"
+#include "io/fs/remote_file_system.h"
+
+namespace doris::io {
+namespace {
+
+constexpr int64_t MULTIPART_COPY_THRESHOLD = 1073741824; // 1GB
+constexpr int64_t MULTIPART_COPY_PART_SIZE = 104857600;  // 100MB per part
+
+#ifndef CHECK_OSS_CLIENT
+#define CHECK_OSS_CLIENT(client)                                 \
+    if (!client) {                                               \
+        return Status::InvalidArgument("init oss client error"); \
+    }
+#endif
+
+Result<std::string> get_oss_key(const Path& full_path) {
+    std::string path_str = full_path.native();
+
+    const std::string oss_prefix = "oss://";
+    if (path_str.find(oss_prefix) == 0) {
+        path_str = path_str.substr(oss_prefix.length());
+
+        size_t pos = path_str.find('/');
+        if (pos == std::string::npos) {
+            return ""; // No key, just bucket
+        }
+        return path_str.substr(pos + 1);
+    }
+
+    return path_str;
+}
+
+} // namespace
+
+OSSClientHolder::OSSClientHolder(OSSClientConf conf) : _conf(std::move(conf)) 
{}
+
+OSSClientHolder::~OSSClientHolder() = default;
+
+Status OSSClientHolder::init() {
+    _client = OSSClientFactory::instance().create(_conf);
+    if (!_client) {
+        return Status::InvalidArgument("failed to init oss client with conf 
{}", _conf.to_string());
+    }
+
+    return Status::OK();
+}
+
+Status OSSClientHolder::reset(const OSSClientConf& conf) {
+    OSSClientConf reset_conf;
+    {
+        std::shared_lock lock(_mtx);
+        if (conf.get_hash() == _conf.get_hash()) {
+            return Status::OK();
+        }
+
+        reset_conf = _conf;
+        reset_conf.ak = conf.ak;
+        reset_conf.sk = conf.sk;
+        reset_conf.token = conf.token;
+        reset_conf.bucket = conf.bucket;
+        reset_conf.connect_timeout_ms = conf.connect_timeout_ms;
+        reset_conf.max_connections = conf.max_connections;
+        reset_conf.request_timeout_ms = conf.request_timeout_ms;
+        reset_conf.cred_provider_type = conf.cred_provider_type;
+    }
+
+    auto client = OSSClientFactory::instance().create(reset_conf);
+    if (!client) {
+        return Status::InvalidArgument("failed to init oss client with conf 
{}", conf.to_string());
+    }
+
+    LOG(WARNING) << "reset oss client with new conf: " << conf.to_string();
+
+    {
+        std::lock_guard lock(_mtx);
+        _client = std::move(client);
+        _conf = std::move(reset_conf);
+    }
+
+    return Status::OK();
+}
+
+Result<int64_t> OSSClientHolder::object_file_size(const std::string& bucket,
+                                                  const std::string& key) 
const {
+    auto client = get();
+    if (!client) {
+        return ResultError(Status::InvalidArgument("init oss client error"));
+    }
+
+    auto outcome = client->HeadObject(bucket, key);
+    if (!outcome.isSuccess()) {
+        return ResultError(Status::IOError("failed to head oss file {}: {} - 
{}",
+                                           full_oss_path(bucket, key), 
outcome.error().Code(),
+                                           outcome.error().Message()));
+    }
+
+    return outcome.result().ContentLength();
+}
+
+std::string OSSClientHolder::full_oss_path(std::string_view bucket, 
std::string_view key) const {
+    return fmt::format("{}/{}/{}", _conf.endpoint, bucket, key);
+}
+
+std::string OSSFileSystem::full_oss_path(std::string_view key) const {
+    return _client->full_oss_path(_bucket, key);
+}
+
+Result<std::shared_ptr<OSSFileSystem>> OSSFileSystem::create(OSSConf oss_conf, 
std::string id) {
+    std::shared_ptr<OSSFileSystem> fs(new OSSFileSystem(std::move(oss_conf), 
std::move(id)));
+    RETURN_IF_ERROR_RESULT(fs->init());
+    return fs;
+}
+
+OSSFileSystem::OSSFileSystem(OSSConf oss_conf, std::string id)
+        : RemoteFileSystem(oss_conf.prefix, std::move(id), 
FileSystemType::OSS),
+          _bucket(std::move(oss_conf.bucket)),
+          _prefix(std::move(oss_conf.prefix)),
+          
_client(std::make_shared<OSSClientHolder>(std::move(oss_conf.client_conf))) {
+    if (!_prefix.empty()) {
+        size_t start = _prefix.find_first_not_of('/');
+        if (start == std::string::npos) {
+            _prefix = "";
+        } else {
+            size_t end = _prefix.find_last_not_of('/');
+            if (start > 0 || end < _prefix.size() - 1) {
+                _prefix = _prefix.substr(start, end - start + 1);
+            }
+        }
+    }
+}
+
+Status OSSFileSystem::init() {
+    return _client->init();
+}
+
+OSSFileSystem::~OSSFileSystem() = default;
+
+Status OSSFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
+                                       const FileWriterOptions* opts) {
+    auto key = DORIS_TRY(get_oss_key(file));
+    *writer = std::make_unique<OSSFileWriter>(_client, _bucket, key, opts);
+    return Status::OK();
+}
+
+Status OSSFileSystem::open_file_internal(const Path& file, FileReaderSPtr* 
reader,
+                                         const FileReaderOptions& opts) {
+    auto key = DORIS_TRY(get_oss_key(file));
+    int64_t fsize = opts.file_size;
+
+    auto oss_reader = DORIS_TRY(OSSFileReader::create(_client, _bucket, key, 
fsize));
+    *reader = oss_reader;
+    return Status::OK();
+}
+
+Status OSSFileSystem::create_directory_impl(const Path& dir, bool 
failed_if_exists) {
+    return Status::OK();
+}
+
+Status OSSFileSystem::delete_file_impl(const Path& file) {
+    auto client = _client->get();
+    CHECK_OSS_CLIENT(client);
+
+    auto key = DORIS_TRY(get_oss_key(file));
+
+    auto outcome = client->DeleteObject(_bucket, key);
+
+    if (!outcome.isSuccess()) {
+        std::string error_code = outcome.error().Code();
+        if (error_code != "NoSuchKey") {
+            return Status::IOError("failed to delete file {}: {} - {}", 
full_oss_path(key),
+                                   error_code, outcome.error().Message());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status OSSFileSystem::delete_directory_impl(const Path& dir) {
+    auto client = _client->get();
+    CHECK_OSS_CLIENT(client);
+
+    auto prefix = DORIS_TRY(get_oss_key(dir));
+    if (!prefix.empty() && prefix.back() != '/') {
+        prefix.push_back('/');
+    }
+
+    // Abort in-progress multipart uploads
+    {
+        bool is_truncated = true;
+        std::string key_marker;
+        std::string upload_id_marker;
+        int aborted_count = 0;
+
+        while (is_truncated) {
+            AlibabaCloud::OSS::ListMultipartUploadsRequest 
list_uploads_request(_bucket);
+            list_uploads_request.setPrefix(prefix);
+            list_uploads_request.setMaxUploads(1000);
+
+            if (!key_marker.empty()) {
+                list_uploads_request.setKeyMarker(key_marker);
+            }
+            if (!upload_id_marker.empty()) {
+                list_uploads_request.setUploadIdMarker(upload_id_marker);
+            }
+
+            auto list_uploads_outcome = 
client->ListMultipartUploads(list_uploads_request);
+            if (!list_uploads_outcome.isSuccess()) {
+                LOG(WARNING) << "Failed to list multipart uploads for prefix " 
<< prefix << ": "
+                             << list_uploads_outcome.error().Code() << " - "
+                             << list_uploads_outcome.error().Message()
+                             << ". Continuing with object deletion.";
+                break; // Don't fail deletion if listing uploads fails
+            }
+
+            const auto& uploads_result = list_uploads_outcome.result();
+            const auto& uploads = uploads_result.MultipartUploadList();
+
+            for (const auto& upload : uploads) {
+                AlibabaCloud::OSS::AbortMultipartUploadRequest 
abort_request(_bucket, upload.Key,
+                                                                             
upload.UploadId);
+                auto abort_outcome = 
client->AbortMultipartUpload(abort_request);
+
+                if (!abort_outcome.isSuccess()) {
+                    LOG(WARNING) << "Failed to abort multipart upload: key=" 
<< upload.Key
+                                 << " upload_id=" << upload.UploadId
+                                 << " error=" << abort_outcome.error().Code() 
<< " - "
+                                 << abort_outcome.error().Message();
+                    // Don't fail directory deletion if abort fails
+                } else {
+                    aborted_count++;
+                    VLOG(1) << "Aborted multipart upload: key=" << upload.Key
+                            << " upload_id=" << upload.UploadId;
+                }
+            }
+
+            is_truncated = uploads_result.IsTruncated();
+            key_marker = uploads_result.NextKeyMarker();
+            upload_id_marker = uploads_result.NextUploadIdMarker();
+        }
+
+        if (aborted_count > 0) {
+            LOG(INFO) << "Aborted " << aborted_count
+                      << " in-progress multipart uploads under prefix: " << 
prefix;
+        }
+    }
+
+    // Delete all objects with prefix
+    bool is_truncated = true;
+    std::string marker;
+
+    while (is_truncated) {
+        AlibabaCloud::OSS::ListObjectsRequest list_request(_bucket);
+        list_request.setPrefix(prefix);
+        list_request.setMaxKeys(1000);
+
+        if (!marker.empty()) {
+            list_request.setMarker(marker);
+        }
+
+        auto list_outcome = client->ListObjects(list_request);
+        if (!list_outcome.isSuccess()) {
+            return Status::IOError("failed to list objects for delete 
directory {}: {} - {}",
+                                   full_oss_path(prefix), 
list_outcome.error().Code(),
+                                   list_outcome.error().Message());
+        }
+
+        const auto& result = list_outcome.result();
+        const auto& objects = result.ObjectSummarys();
+
+        if (!objects.empty()) {
+            AlibabaCloud::OSS::DeletedKeyList keys;
+            for (const auto& obj : objects) {
+                keys.push_back(obj.Key());
+            }
+
+            AlibabaCloud::OSS::DeleteObjectsRequest delete_request(_bucket);
+            delete_request.setKeyList(keys);
+            auto delete_outcome = client->DeleteObjects(delete_request);
+
+            if (!delete_outcome.isSuccess()) {
+                return Status::IOError("failed to batch delete objects: {} - 
{}",
+                                       delete_outcome.error().Code(),
+                                       delete_outcome.error().Message());
+            }
+        }
+
+        is_truncated = result.IsTruncated();
+        marker = result.NextMarker();
+    }
+
+    return Status::OK();
+}
+
+Status OSSFileSystem::batch_delete_impl(const std::vector<Path>& remote_files) 
{
+    auto client = _client->get();
+    CHECK_OSS_CLIENT(client);
+
+    constexpr size_t max_delete_batch = 1000;
+    auto path_iter = remote_files.begin();
+
+    do {
+        AlibabaCloud::OSS::DeletedKeyList keys;
+        auto path_begin = path_iter;
+
+        for (; path_iter != remote_files.end() && (path_iter - path_begin < 
max_delete_batch);
+             ++path_iter) {
+            auto key = DORIS_TRY(get_oss_key(*path_iter));
+            keys.push_back(key);
+        }
+
+        if (keys.empty()) {
+            break;
+        }
+
+        AlibabaCloud::OSS::DeleteObjectsRequest request(_bucket);
+        request.setKeyList(keys);
+        auto outcome = client->DeleteObjects(request);
+
+        if (!outcome.isSuccess()) {
+            return Status::IOError("failed to batch delete objects: {} - {}",
+                                   outcome.error().Code(), 
outcome.error().Message());
+        }
+    } while (path_iter != remote_files.end());
+
+    return Status::OK();
+}
+
+Status OSSFileSystem::exists_impl(const Path& path, bool* res) const {
+    auto client = _client->get();
+    CHECK_OSS_CLIENT(client);
+
+    auto key = DORIS_TRY(get_oss_key(path));
+
+    *res = client->DoesObjectExist(_bucket, key);
+    return Status::OK();

Review Comment:
   **HIGH — `DoesObjectExist()` cannot distinguish "not found" from "error"**
   
   The OSS SDK's `DoesObjectExist()` returns a simple boolean. If the OSS 
request fails (network error, auth failure, bucket not found), it returns 
`false`, which is indistinguishable from "object does not exist." This means 
connectivity failures are silently treated as "file not found."
   
   The S3 counterpart uses `HeadObject` which returns proper error details. 
Consider using `HeadObject` instead:
   ```cpp
   auto outcome = client->HeadObject(_bucket, key);
   if (!outcome.isSuccess()) {
       if (outcome.error().Code() == "NoSuchKey" || outcome.error().Code() == 
"404") {
           *res = false;
           return Status::OK();
       }
       return Status::IOError("failed to check existence: ...")
   }
   *res = true;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to