This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8ffb1704620 [feature](Cloud) Implement gcs accessor for compatibility
(#34081)
8ffb1704620 is described below
commit 8ffb170462028abd40fe166b9df8687f032bb020
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 11:20:41 2024 +0800
[feature](Cloud) Implement gcs accessor for compatibility (#34081)
---
cloud/src/recycler/s3_accessor.cpp | 33 +++++++++++-
cloud/src/recycler/s3_accessor.h | 9 ++++
cloud/test/s3_accessor_test.cpp | 103 ++++++++++++++++++++++++++++++++++++-
3 files changed, 142 insertions(+), 3 deletions(-)
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 543f84f87fc..d1ebfe62a1d 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -30,6 +30,8 @@
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
+#include <algorithm>
+#include <execution>
#include <utility>
#include "common/logging.h"
@@ -226,7 +228,21 @@ int S3Accessor::delete_objects(const
std::vector<std::string>& relative_paths) {
}
int S3Accessor::delete_object(const std::string& relative_path) {
- // TODO(cyx)
+ Aws::S3::Model::DeleteObjectRequest request;
+ auto key = get_key(relative_path);
+ request.WithBucket(conf_.bucket).WithKey(key);
+ auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request),
+ "s3_client::delete_object",
request);
+ if (!outcome.IsSuccess()) {
+ LOG_WARNING("failed to delete object")
+ .tag("endpoint", conf_.endpoint)
+ .tag("bucket", conf_.bucket)
+ .tag("key", key)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("exception", outcome.GetError().GetExceptionName());
+ return -1;
+ }
return 0;
}
@@ -422,5 +438,20 @@ int S3Accessor::check_bucket_versioning() {
return 0;
}
+int GcsAccessor::delete_objects(const std::vector<std::string>&
relative_paths) {
+ std::vector<int> delete_rets(relative_paths.size());
+ std::transform(std::execution::par, relative_paths.begin(),
relative_paths.end(),
+ delete_rets.begin(),
+ [this](const std::string& path) { return
delete_object(path); });
+ int ret = 0;
+ for (int delete_ret : delete_rets) {
+ if (delete_ret != 0) {
+ ret = delete_ret;
+ break;
+ }
+ }
+ return ret;
+}
+
#undef HELP_MACRO
} // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 10291cfd4ba..1025ceab52e 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -90,4 +90,13 @@ private:
std::string path_;
};
+class GcsAccessor final : public S3Accessor {
+public:
+ explicit GcsAccessor(S3Conf conf) : S3Accessor(std::move(conf)) {}
+ ~GcsAccessor() override = default;
+
+ // returns 0 for success otherwise error
+ int delete_objects(const std::vector<std::string>& relative_paths)
override;
+};
+
} // namespace doris::cloud
diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp
index bb8b7c27bd9..972505c3999 100644
--- a/cloud/test/s3_accessor_test.cpp
+++ b/cloud/test/s3_accessor_test.cpp
@@ -58,6 +58,8 @@ public:
const Aws::S3::Model::ListObjectsV2Request& req) = 0;
virtual Aws::S3::Model::DeleteObjectsOutcome DeleteObjects(
const Aws::S3::Model::DeleteObjectsRequest& req) = 0;
+ virtual Aws::S3::Model::DeleteObjectOutcome DeleteObject(
+ const Aws::S3::Model::DeleteObjectRequest& req) = 0;
virtual Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& req) = 0;
virtual Aws::S3::Model::HeadObjectOutcome HeadObject(
@@ -122,6 +124,13 @@ public:
return Aws::S3::Model::DeleteObjectsOutcome(std::move(result));
}
+ Aws::S3::Model::DeleteObjectOutcome DeleteObject(
+ const Aws::S3::Model::DeleteObjectRequest& req) override {
+ Aws::S3::Model::DeleteObjectResult result;
+ _mock_fs->delete_object(req.GetKey());
+ return Aws::S3::Model::DeleteObjectOutcome(std::move(result));
+ }
+
Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& req) override {
Aws::S3::Model::PutObjectResult result;
@@ -207,6 +216,18 @@ public:
return Aws::S3::Model::DeleteObjectsOutcome(std::move(err));
}
+ Aws::S3::Model::DeleteObjectOutcome DeleteObject(
+ const Aws::S3::Model::DeleteObjectRequest& req) override {
+ if (!return_error_for_error_s3_client) {
+ return _correct_impl->DeleteObject(req);
+ }
+ auto err =
Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
+ false);
+ err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
+ // return -1
+ return Aws::S3::Model::DeleteObjectOutcome(std::move(err));
+ }
+
Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& req) override {
if (!return_error_for_error_s3_client) {
@@ -267,6 +288,10 @@ public:
return _impl->DeleteObjects(req);
}
+ auto DeleteObject(const Aws::S3::Model::DeleteObjectRequest& req) {
+ return _impl->DeleteObject(req);
+ }
+
auto PutObject(const Aws::S3::Model::PutObjectRequest& req) { return
_impl->PutObject(req); }
auto HeadObject(const Aws::S3::Model::HeadObjectRequest& req) { return
_impl->HeadObject(req); }
@@ -304,6 +329,12 @@ static auto callbacks = std::array {
Aws::S3::Model::DeleteObjectsRequest*>*)p;
*pair.first =
(*_mock_client).DeleteObjects(*pair.second);
}},
+ MockCallable {"s3_client::delete_object",
+ [](void* p) {
+ auto pair =
*(std::pair<Aws::S3::Model::DeleteObjectOutcome*,
+
Aws::S3::Model::DeleteObjectRequest*>*)p;
+ *pair.first =
(*_mock_client).DeleteObject(*pair.second);
+ }},
MockCallable {"s3_client::put_object",
[](void* p) {
auto pair =
*(std::pair<Aws::S3::Model::PutObjectOutcome*,
@@ -614,8 +645,7 @@ TEST(S3AccessorTest, exist_error) {
ASSERT_EQ(-1, accessor->exist(prefix));
}
-// function is not implemented
-TEST(S3AccessorTest, DISABLED_delete_object) {
+TEST(S3AccessorTest, delete_object) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
@@ -641,6 +671,75 @@ TEST(S3AccessorTest, DISABLED_delete_object) {
}
}
+TEST(S3AccessorTest, gcs_delete_objects) {
+ _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
+ _mock_client = std::make_unique<MockS3Client>();
+ auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+ auto sp = SyncPoint::get_instance();
+ std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
+ sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
+ [](void* p) { *((bool*)p) = true; });
+ sp->set_call_back(mock_callback.point_name, mock_callback.func);
+ });
+ sp->enable_processing();
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ sp->disable_processing();
+ std::for_each(callbacks.begin(), callbacks.end(), [&](const
MockCallable& mock_callback) {
+ sp->clear_call_back(mock_callback.point_name);
+ });
+ });
+ std::string prefix = "test_delete_object";
+ std::vector<std::string> paths;
+ size_t num = 300;
+ for (size_t i = 0; i < num; i++) {
+ auto path = fmt::format("{}{}", prefix, i);
+ _mock_fs->put_object(path, "");
+ paths.emplace_back(std::move(path));
+ }
+ ASSERT_EQ(0, accessor->delete_objects(paths));
+ for (size_t i = 0; i < num; i++) {
+ auto path = fmt::format("{}{}", prefix, i);
+ ASSERT_EQ(1, accessor->exist(path));
+ }
+}
+
+TEST(S3AccessorTest, gcs_delete_objects_error) {
+ _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
+ _mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
+ auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+ auto sp = SyncPoint::get_instance();
+ std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
+ sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
+ [](void* p) { *((bool*)p) = true; });
+ sp->set_call_back(mock_callback.point_name, mock_callback.func);
+ });
+ sp->enable_processing();
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ sp->disable_processing();
+ std::for_each(callbacks.begin(), callbacks.end(), [&](const
MockCallable& mock_callback) {
+ sp->clear_call_back(mock_callback.point_name);
+ });
+ return_error_for_error_s3_client = false;
+ });
+ std::string prefix = "test_delete_objects";
+ std::vector<std::string> paths_first_half;
+ std::vector<std::string> paths_second_half;
+ size_t num = 300;
+ for (size_t i = 0; i < num; i++) {
+ auto path = fmt::format("{}{}", prefix, i);
+ _mock_fs->put_object(path, "");
+ if (i < 150) {
+ paths_first_half.emplace_back(std::move(path));
+ } else {
+ paths_second_half.emplace_back(std::move(path));
+ }
+ }
+ std::vector<std::string> empty;
+ ASSERT_EQ(0, accessor->delete_objects(empty));
+ return_error_for_error_s3_client = true;
+ ASSERT_EQ(-1, accessor->delete_objects(paths_first_half));
+}
+
TEST(S3AccessorTest, delete_objects) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]