This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5b31ceae386 [feature-wip](Cloud) Add azure obj client into recycler
(#35849)
5b31ceae386 is described below
commit 5b31ceae3869ebf086232473fe12620b19297108
Author: AlexYue <[email protected]>
AuthorDate: Wed Jun 5 14:16:17 2024 +0800
[feature-wip](Cloud) Add azure obj client into recycler (#35849)
As one subsequent pr of #35307, this pr will link azure into recycler,
and implements the corresponding interface of ObjStorageClient for
Azure.
---
cloud/cmake/thirdparty.cmake | 6 +
cloud/src/recycler/azure_obj_client.cpp | 202 +++++++++++++++++++++
.../{s3_obj_client.h => azure_obj_client.h} | 19 +-
cloud/src/recycler/obj_store_accessor.h | 10 +-
cloud/src/recycler/s3_accessor.cpp | 17 +-
cloud/src/recycler/s3_accessor.h | 2 +-
cloud/src/recycler/s3_obj_client.h | 2 +-
7 files changed, 243 insertions(+), 15 deletions(-)
diff --git a/cloud/cmake/thirdparty.cmake b/cloud/cmake/thirdparty.cmake
index 0e148896bfc..bacd7d25b3d 100644
--- a/cloud/cmake/thirdparty.cmake
+++ b/cloud/cmake/thirdparty.cmake
@@ -102,6 +102,12 @@ add_thirdparty(lzma LIB64)
add_thirdparty(idn LIB64)
add_thirdparty(gsasl)
# end krb5 libs
+# begin azure libs
+add_thirdparty(azure-core)
+add_thirdparty(azure-identity)
+add_thirdparty(azure-storage-blobs)
+add_thirdparty(azure-storage-common)
+# end azure libs
add_thirdparty(gtest NOTADD)
add_thirdparty(gtest_main NOTADD)
diff --git a/cloud/src/recycler/azure_obj_client.cpp
b/cloud/src/recycler/azure_obj_client.cpp
new file mode 100644
index 00000000000..179ebd980fa
--- /dev/null
+++ b/cloud/src/recycler/azure_obj_client.cpp
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/azure_obj_client.h"
+
+#include <fmt/core.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <azure/core/io/body_stream.hpp>
+#include <azure/storage/blobs.hpp>
+#include <azure/storage/blobs/blob_client.hpp>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/blobs/rest_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
+#include <azure/storage/common/storage_exception.hpp>
+#include <iterator>
+
+#include "recycler/obj_store_accessor.h"
+
+namespace doris::cloud {
+
+constexpr size_t BlobBatchMaxOperations = 256;
+
+template <typename Func>
+ObjectStorageResponse do_azure_client_call(Func f) {
+ try {
+ f();
+ } catch (Azure::Storage::StorageException& e) {
+ return {-1, fmt::format("Azure request failed because {}, http code
{}, request id {}",
+ e.Message, static_cast<int>(e.StatusCode),
e.RequestId)};
+ }
+ return {};
+}
+
+ObjectStorageResponse AzureObjClient::put_object(const
ObjectStoragePathOptions& opts,
+ std::string_view stream) {
+ auto client = _client->GetBlockBlobClient(opts.key);
+ return do_azure_client_call([&]() {
+ client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()),
stream.size());
+ });
+}
+
+ObjectStorageResponse AzureObjClient::head_object(const
ObjectStoragePathOptions& opts) {
+ try {
+ Azure::Storage::Blobs::Models::BlobProperties properties =
+ _client->GetBlockBlobClient(opts.key).GetProperties().Value;
+ return {};
+ } catch (Azure::Storage::StorageException& e) {
+ if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
+ return {1};
+ }
+ return {-1, fmt::format("Failed to head azure blob due to {}, http
code {}, request id {}",
+ e.Message, static_cast<int>(e.StatusCode),
e.RequestId)};
+ }
+}
+
+ObjectStorageResponse AzureObjClient::list_objects(const
ObjectStoragePathOptions& opts,
+ std::vector<ObjectMeta>*
files) {
+ auto get_object_meta = [&](auto&& resp) {
+ std::ranges::transform(
+ resp.Blobs, std::back_inserter(*files), [](auto&& blob_item)
-> ObjectMeta {
+ return {.path = std::move(blob_item.Name),
+ .size = blob_item.BlobSize,
+ .last_modify_second =
+
blob_item.Details.LastModified.time_since_epoch().count()};
+ });
+ };
+ return do_azure_client_call([&]() {
+ Azure::Storage::Blobs::ListBlobsOptions list_opts;
+ list_opts.Prefix = opts.prefix;
+ auto resp = _client->ListBlobs(list_opts);
+ get_object_meta(resp);
+ while (!resp.NextPageToken->empty()) {
+ list_opts.ContinuationToken = resp.NextPageToken;
+ resp = _client->ListBlobs(list_opts);
+ get_object_meta(resp);
+ }
+ });
+}
+
+// As Azure's doc said, the batch size is 256
+// You can find out the num in
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+// > Each batch request supports a maximum of 256 subrequests.
+ObjectStorageResponse AzureObjClient::delete_objects(const
ObjectStoragePathOptions& opts,
+ std::vector<std::string>
objs) {
+ // TODO(ByteYue) : use range to adate this code when compiler is ready
+ // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
+ auto begin = std::begin(objs);
+ auto end = std::end(objs);
+
+ while (begin != end) {
+ auto batch = _client->CreateBatch();
+ auto chunkEnd = begin;
+ std::advance(chunkEnd, std::min(BlobBatchMaxOperations,
+
static_cast<size_t>(std::distance(begin, end))));
+ for (auto it = begin; it != chunkEnd; ++it) {
+ batch.DeleteBlob(*it);
+ }
+ begin = chunkEnd;
+ auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch);
});
+ if (resp.ret != 0) {
+ return resp;
+ }
+ }
+ return {};
+}
+
+ObjectStorageResponse AzureObjClient::delete_object(const
ObjectStoragePathOptions& opts) {
+ return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); });
+}
+
+ObjectStorageResponse AzureObjClient::delete_objects_recursively(
+ const ObjectStoragePathOptions& opts) {
+ Azure::Storage::Blobs::ListBlobsOptions list_opts;
+ list_opts.Prefix = opts.prefix;
+ list_opts.PageSizeHint = BlobBatchMaxOperations;
+ auto resp = _client->ListBlobs(list_opts);
+ auto batch = _client->CreateBatch();
+ for (auto&& blob_item : resp.Blobs) {
+ batch.DeleteBlob(blob_item.Name);
+ }
+ auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch);
});
+ if (response.ret != 0) {
+ return response;
+ }
+ while (!resp.NextPageToken->empty()) {
+ batch = _client->CreateBatch();
+ list_opts.ContinuationToken = resp.NextPageToken;
+ resp = _client->ListBlobs(list_opts);
+ for (auto&& blob_item : resp.Blobs) {
+ batch.DeleteBlob(blob_item.Name);
+ }
+ auto response = do_azure_client_call([&]() {
_client->SubmitBatch(batch); });
+ if (response.ret != 0) {
+ return response;
+ }
+ }
+ return {};
+}
+
+ObjectStorageResponse AzureObjClient::delete_expired(const
ObjectStorageDeleteExpiredOptions& opts,
+ int64_t expired_time) {
+ Azure::Storage::Blobs::ListBlobsOptions list_opts;
+ list_opts.Prefix = opts.path_opts.prefix;
+ list_opts.PageSizeHint = BlobBatchMaxOperations;
+ auto resp = _client->ListBlobs(list_opts);
+ auto batch = _client->CreateBatch();
+ for (auto&& blob_item : resp.Blobs) {
+ batch.DeleteBlob(blob_item.Name);
+ }
+ auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch);
});
+ if (response.ret != 0) {
+ return response;
+ }
+ while (!resp.NextPageToken->empty()) {
+ batch = _client->CreateBatch();
+ list_opts.ContinuationToken = resp.NextPageToken;
+ resp = _client->ListBlobs(list_opts);
+ for (auto&& blob_item : resp.Blobs) {
+ if (blob_item.Details.LastModified.time_since_epoch().count() <
expired_time) {
+ batch.DeleteBlob(blob_item.Name);
+ }
+ }
+ auto response = do_azure_client_call([&]() {
_client->SubmitBatch(batch); });
+ if (response.ret != 0) {
+ return response;
+ }
+ }
+ return {};
+}
+
+ObjectStorageResponse AzureObjClient::get_life_cycle(const
ObjectStoragePathOptions& opts,
+ int64_t* expiration_days)
{
+ return {-1};
+}
+
+ObjectStorageResponse AzureObjClient::check_versioning(const
ObjectStoragePathOptions& opts) {
+ return {-1};
+}
+
+const std::shared_ptr<Aws::S3::S3Client>& AzureObjClient::s3_client() {
+ CHECK(true) << "Currently this is unreachable";
+ // TODO(ByteYue): use std::unreachable() instead when compiler supports it
+ __builtin_unreachable();
+}
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/s3_obj_client.h
b/cloud/src/recycler/azure_obj_client.h
similarity index 80%
copy from cloud/src/recycler/s3_obj_client.h
copy to cloud/src/recycler/azure_obj_client.h
index 891474b5289..79b129ed297 100644
--- a/cloud/src/recycler/s3_obj_client.h
+++ b/cloud/src/recycler/azure_obj_client.h
@@ -21,16 +21,16 @@
#include "recycler/obj_store_accessor.h"
-namespace Aws::S3 {
-class S3Client;
-} // namespace Aws::S3
+namespace Azure::Storage::Blobs {
+class BlobContainerClient;
+} // namespace Azure::Storage::Blobs
namespace doris::cloud {
-
-class S3ObjClient : public ObjStorageClient {
+class AzureObjClient : public ObjStorageClient {
public:
- S3ObjClient(std::shared_ptr<Aws::S3::S3Client> client) :
s3_client_(std::move(client)) {}
- ~S3ObjClient() override = default;
+ AzureObjClient(std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient>
client)
+ : _client(std::move(client)) {}
+ ~AzureObjClient() override = default;
ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
std::string_view stream) override;
@@ -48,10 +48,9 @@ public:
ObjectStorageResponse check_versioning(const ObjectStoragePathOptions&
opts) override;
- const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_;
}
+ const std::shared_ptr<Aws::S3::S3Client>& s3_client() override;
private:
- std::shared_ptr<Aws::S3::S3Client> s3_client_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
};
-
} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/obj_store_accessor.h
b/cloud/src/recycler/obj_store_accessor.h
index a29266133ca..0238a317880 100644
--- a/cloud/src/recycler/obj_store_accessor.h
+++ b/cloud/src/recycler/obj_store_accessor.h
@@ -18,9 +18,14 @@
#pragma once
#include <functional>
+#include <memory>
#include <string>
#include <vector>
+namespace Aws::S3 {
+class S3Client;
+} // namespace Aws::S3
+
namespace doris::cloud {
struct ObjectMeta {
@@ -32,6 +37,7 @@ struct ObjectMeta {
enum class AccessorType {
S3,
HDFS,
+ AZURE,
};
// TODO(plat1ko): Redesign `Accessor` interface to adapt to storage vaults
other than S3 style
@@ -86,7 +92,7 @@ struct ObjectStorageDeleteExpiredOptions {
struct ObjectCompleteMultiParts {};
struct ObjectStorageResponse {
- ObjectStorageResponse(int r, std::string msg = "") : ret(r),
error_msg(std::move(msg)) {}
+ ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r),
error_msg(std::move(msg)) {}
// clang-format off
int ret {0}; // To unify the error handle logic with BE, we'd better use
the same error code as BE
// clang-format on
@@ -124,6 +130,8 @@ public:
int64_t* expiration_days) = 0;
// Check if the objects' versioning is on or off
virtual ObjectStorageResponse check_versioning(const
ObjectStoragePathOptions& opts) = 0;
+
+ virtual const std::shared_ptr<Aws::S3::S3Client>& s3_client() = 0;
};
} // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 04f642f3831..43341ab63d3 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -31,14 +31,16 @@
#include <aws/s3/model/PutObjectRequest.h>
#include <algorithm>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
#include <execution>
-#include <type_traits>
+#include <memory>
#include <utility>
#include "common/config.h"
#include "common/logging.h"
-#include "common/sync_point.h"
#include "rate-limiter/s3_rate_limiter.h"
+#include "recycler/azure_obj_client.h"
#include "recycler/obj_store_accessor.h"
#include "recycler/s3_obj_client.h"
@@ -138,6 +140,17 @@ std::string S3Accessor::get_relative_path(const
std::string& key) const {
int S3Accessor::init() {
static S3Environment s3_env;
+ if (type() == AccessorType::AZURE) {
+ auto cred =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak,
conf_.sk);
+ const std::string container_name = conf_.bucket;
+ const std::string uri =
+ fmt::format("http://{}.blob.core.windows.net/{}", conf_.ak,
container_name);
+ auto container_client =
+
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
+ obj_client_ =
std::make_shared<AzureObjClient>(std::move(container_client));
+ return 0;
+ }
Aws::Auth::AWSCredentials aws_cred(conf_.ak, conf_.sk);
Aws::Client::ClientConfiguration aws_config;
aws_config.endpointOverride = conf_.endpoint;
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 5221f90cafc..3ebaf89cff3 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -93,7 +93,7 @@ private:
private:
S3Conf conf_;
std::string path_;
- std::shared_ptr<S3ObjClient> obj_client_;
+ std::shared_ptr<ObjStorageClient> obj_client_;
};
class GcsAccessor final : public S3Accessor {
diff --git a/cloud/src/recycler/s3_obj_client.h
b/cloud/src/recycler/s3_obj_client.h
index 891474b5289..4f6ef9eab08 100644
--- a/cloud/src/recycler/s3_obj_client.h
+++ b/cloud/src/recycler/s3_obj_client.h
@@ -48,7 +48,7 @@ public:
ObjectStorageResponse check_versioning(const ObjectStoragePathOptions&
opts) override;
- const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_;
}
+ const std::shared_ptr<Aws::S3::S3Client>& s3_client() override { return
s3_client_; }
private:
std::shared_ptr<Aws::S3::S3Client> s3_client_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]