This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d4439383e54 [feature](Cloud) Add azure's support to BE (#35670)
d4439383e54 is described below
commit d4439383e54004db01b0012d44c8003b5c547432
Author: AlexYue <[email protected]>
AuthorDate: Tue Jun 11 15:03:38 2024 +0800
[feature](Cloud) Add azure's support to BE (#35670)
As one subsequent pr of #35307, this pr will link azure into BE, and
implements the corresponding interface of ObjStorageClient for Azure.
---
be/cmake/thirdparty.cmake | 5 +
be/src/agent/task_worker_pool.cpp | 28 +---
be/src/cloud/cloud_meta_mgr.cpp | 15 +-
be/src/io/fs/azure_obj_storage_client.cpp | 235 ++++++++++++++++++++++++++++++
be/src/io/fs/azure_obj_storage_client.h | 63 ++++++++
be/src/io/fs/obj_storage_client.h | 3 +-
be/src/io/fs/s3_file_system.cpp | 8 +-
be/src/util/s3_util.cpp | 148 +++++++++++++++++--
be/src/util/s3_util.h | 26 ++--
be/test/io/fs/azure_test.cpp | 96 ++++++++++++
be/test/io/fs/s3_file_writer_test.cpp | 3 +-
gensrc/proto/cloud.proto | 14 +-
gensrc/thrift/AgentService.thrift | 11 ++
13 files changed, 593 insertions(+), 62 deletions(-)
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 78f28fe72dc..41203a9006a 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -139,6 +139,11 @@ if (NOT OS_MACOSX)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
endif()
+add_thirdparty(azure-core)
+add_thirdparty(azure-identity)
+add_thirdparty(azure-storage-blobs)
+add_thirdparty(azure-storage-common)
+
add_thirdparty(minizip LIB64)
add_thirdparty(simdjson LIB64)
add_thirdparty(idn LIB64)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 31cfe19d4f0..a776be08f79 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -53,6 +53,7 @@
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
+#include "io/fs/obj_storage_client.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
@@ -1378,23 +1379,8 @@ void update_s3_resource(const TStorageResource& param,
io::RemoteFileSystemSPtr
if (!existed_fs) {
// No such FS instance on BE
- S3Conf s3_conf {
- .bucket = param.s3_storage_param.bucket,
- .prefix = param.s3_storage_param.root_path,
- .client_conf = {
- .endpoint = param.s3_storage_param.endpoint,
- .region = param.s3_storage_param.region,
- .ak = param.s3_storage_param.ak,
- .sk = param.s3_storage_param.sk,
- .token = param.s3_storage_param.token,
- .max_connections = param.s3_storage_param.max_conn,
- .request_timeout_ms =
param.s3_storage_param.request_timeout_ms,
- .connect_timeout_ms =
param.s3_storage_param.conn_timeout_ms,
- // When using cold heat separation in minio, user
might use ip address directly,
- // which needs enable use_virtual_addressing to true
- .use_virtual_addressing =
!param.s3_storage_param.use_path_style,
- }};
- auto res = io::S3FileSystem::create(std::move(s3_conf),
std::to_string(param.id));
+ auto res =
io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
+ std::to_string(param.id));
if (!res.has_value()) {
st = std::move(res).error();
} else {
@@ -1403,10 +1389,12 @@ void update_s3_resource(const TStorageResource& param,
io::RemoteFileSystemSPtr
} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << '
' << param.name;
auto client =
static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
+ auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
- .ak = param.s3_storage_param.ak,
- .sk = param.s3_storage_param.sk,
- .token = param.s3_storage_param.token,
+ .ak = std::move(new_s3_conf.client_conf.ak),
+ .sk = std::move(new_s3_conf.client_conf.sk),
+ .token = std::move(new_s3_conf.client_conf.token),
+ .provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
fs = std::move(existed_fs);
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index a14ec2b0497..d55c884a6c2 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -46,6 +46,7 @@
#include "gen_cpp/Types_types.h"
#include "gen_cpp/cloud.pb.h"
#include "gen_cpp/olap_file.pb.h"
+#include "io/fs/obj_storage_client.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
@@ -825,17 +826,7 @@ Status
CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
}
auto add_obj_store = [&vault_infos](const auto& obj_store) {
- vault_infos->emplace_back(obj_store.id(),
- S3Conf {
- .bucket = obj_store.bucket(),
- .prefix = obj_store.prefix(),
- .client_conf {.endpoint =
obj_store.endpoint(),
- .region =
obj_store.region(),
- .ak = obj_store.ak(),
- .sk = obj_store.sk()},
- .sse_enabled =
obj_store.sse_enabled(),
- .provider = obj_store.provider(),
- },
+ vault_infos->emplace_back(obj_store.id(),
S3Conf::get_s3_conf(obj_store),
StorageVaultPB_PathFormat {});
};
@@ -853,7 +844,7 @@ Status
CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) +
"xxx");
}
for (int i = 0; i < resp.storage_vault_size(); ++i) {
- auto j = resp.mutable_storage_vault(i);
+ auto* j = resp.mutable_storage_vault(i);
if (!j->has_obj_info()) continue;
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}
diff --git a/be/src/io/fs/azure_obj_storage_client.cpp
b/be/src/io/fs/azure_obj_storage_client.cpp
new file mode 100644
index 00000000000..9569bf9a8e8
--- /dev/null
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/azure_obj_storage_client.h"
+
+#include <algorithm>
+#include <azure/core/io/body_stream.hpp>
+#include <azure/storage/blobs.hpp>
+#include <azure/storage/blobs/blob_client.hpp>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/blobs/rest_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
+#include <azure/storage/common/storage_exception.hpp>
+#include <iterator>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "io/fs/obj_storage_client.h"
+
+namespace {
+std::string wrap_object_storage_path_msg(const
doris::io::ObjectStoragePathOptions& opts) {
+ return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket,
opts.key, opts.prefix,
+ opts.path.native());
+}
+} // namespace
+
+namespace doris::io {
+
+// As Azure's doc said, the batch size is 256
+// You can find out the num in
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+// > Each batch request supports a maximum of 256 subrequests.
+constexpr size_t BlobBatchMaxOperations = 256;
+
+template <typename Func>
+ObjectStorageResponse do_azure_client_call(Func f, const
ObjectStoragePathOptions& opts) {
+ try {
+ f();
+ } catch (Azure::Storage::StorageException& e) {
+ auto msg = fmt::format("Azure request failed because {}, path msg {}",
e.Message,
+ wrap_object_storage_path_msg(opts));
+ LOG_WARNING(msg);
+ return {.status =
convert_to_obj_response(Status::InternalError<false>(std::move(msg))),
+ .http_code = static_cast<int>(e.StatusCode),
+ .request_id = std::move(e.RequestId)};
+ }
+ return {};
+}
+
+// Azure would do nothing
+ObjectStorageUploadResponse AzureObjStorageClient::create_multipart_upload(
+ const ObjectStoragePathOptions& opts) {
+ return {};
+}
+
+ObjectStorageResponse AzureObjStorageClient::put_object(const
ObjectStoragePathOptions& opts,
+ std::string_view
stream) {
+ auto client = _client->GetBlockBlobClient(opts.key);
+ return do_azure_client_call(
+ [&]() {
+ client.UploadFrom(reinterpret_cast<const
uint8_t*>(stream.data()), stream.size());
+ },
+ opts);
+}
+
+ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const
ObjectStoragePathOptions& opts,
+
std::string_view stream,
+ int part_num) {
+ auto client = _client->GetBlockBlobClient(opts.key);
+ try {
+ Azure::Core::IO::MemoryBodyStream memory_body(
+ reinterpret_cast<const uint8_t*>(stream.data()),
stream.size());
+ client.StageBlock(std::to_string(part_num), memory_body);
+ } catch (Azure::Storage::StorageException& e) {
+ auto msg = fmt::format("Azure request failed because {}, path msg {}",
e.Message,
+ wrap_object_storage_path_msg(opts));
+ LOG_WARNING(msg);
+ // clang-format off
+ return {
+ .resp = {
+ .status = convert_to_obj_response(
+ Status::InternalError<false>(std::move(msg))),
+ .http_code = static_cast<int>(e.StatusCode),
+ .request_id = std::move(e.RequestId),
+ },
+ };
+ // clang-format on
+ }
+ return {};
+}
+
+ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
+ const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts&
completed_parts) {
+ auto client = _client->GetBlockBlobClient(opts.key);
+ const auto& block_ids = static_cast<const
AzureCompleteMultiParts&>(completed_parts).block_ids;
+ std::vector<std::string> string_block_ids;
+ std::ranges::transform(block_ids, std::back_inserter(string_block_ids),
+ [](int i) { return std::to_string(i); });
+ return do_azure_client_call([&]() {
client.CommitBlockList(string_block_ids); }, opts);
+}
+
+ObjectStorageHeadResponse AzureObjStorageClient::head_object(const
ObjectStoragePathOptions& opts) {
+ try {
+ Azure::Storage::Blobs::Models::BlobProperties properties =
+ _client->GetBlockBlobClient(opts.key).GetProperties().Value;
+ return {.file_size = properties.BlobSize};
+ } catch (Azure::Storage::StorageException& e) {
+ if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
+ return ObjectStorageHeadResponse {
+ .resp = {.status =
convert_to_obj_response(Status::NotFound<false>("")),
+ .http_code = static_cast<int>(e.StatusCode),
+ .request_id = std::move(e.RequestId)},
+ };
+ }
+ auto msg = fmt::format("Failed to head azure blob due to {}, path msg
{}", e.Message,
+ wrap_object_storage_path_msg(opts));
+ return ObjectStorageHeadResponse {
+ .resp = {.status = convert_to_obj_response(
+ Status::InternalError<false>(std::move(msg))),
+ .http_code = static_cast<int>(e.StatusCode),
+ .request_id = std::move(e.RequestId)},
+ };
+ }
+}
+
+ObjectStorageResponse AzureObjStorageClient::get_object(const
ObjectStoragePathOptions& opts,
+ void* buffer, size_t
offset,
+ size_t bytes_read,
size_t* size_return) {
+ auto client = _client->GetBlockBlobClient(opts.key);
+ return do_azure_client_call(
+ [&]() {
+ Azure::Storage::Blobs::DownloadBlobToOptions download_opts;
+ download_opts.Range->Offset = offset;
+ download_opts.Range->Length = bytes_read;
+ client.DownloadTo(reinterpret_cast<uint8_t*>(buffer),
bytes_read, download_opts);
+ },
+ opts);
+}
+
+ObjectStorageResponse AzureObjStorageClient::list_objects(const
ObjectStoragePathOptions& opts,
+
std::vector<FileInfo>* files) {
+ auto get_file_file = [&](Azure::Storage::Blobs::ListBlobsPagedResponse&
resp) {
+ std::ranges::transform(resp.Blobs, std::back_inserter(*files),
[](auto&& blob_item) {
+ return FileInfo {
+ .file_name = blob_item.Name, .file_size =
blob_item.BlobSize, .is_file = true};
+ });
+ };
+ return do_azure_client_call(
+ [&]() {
+ Azure::Storage::Blobs::ListBlobsOptions list_opts;
+ list_opts.Prefix = opts.prefix;
+ auto resp = _client->ListBlobs(list_opts);
+ get_file_file(resp);
+ while (!resp.NextPageToken->empty()) {
+ list_opts.ContinuationToken = resp.NextPageToken;
+ resp = _client->ListBlobs(list_opts);
+ get_file_file(resp);
+ }
+ },
+ opts);
+}
+
+// As Azure's doc said, the batch size is 256
+// You can find out the num in
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+// > Each batch request supports a maximum of 256 subrequests.
+ObjectStorageResponse AzureObjStorageClient::delete_objects(const
ObjectStoragePathOptions& opts,
+
std::vector<std::string> objs) {
+ // TODO(ByteYue) : use range to adate this code when compiler is ready
+ // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
+ auto begin = std::begin(objs);
+ auto end = std::end(objs);
+
+ while (begin != end) {
+ auto batch = _client->CreateBatch();
+ auto chunkEnd = begin;
+ std::advance(chunkEnd, std::min(BlobBatchMaxOperations,
+
static_cast<size_t>(std::distance(begin, end))));
+ for (auto it = begin; it != chunkEnd; ++it) {
+ batch.DeleteBlob(*it);
+ }
+ begin = chunkEnd;
+ auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch);
}, opts);
+ if (resp.status.code != ErrorCode::OK) {
+ return resp;
+ }
+ }
+ return {};
+}
+
+ObjectStorageResponse AzureObjStorageClient::delete_object(const
ObjectStoragePathOptions& opts) {
+ return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); },
opts);
+}
+
+ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively(
+ const ObjectStoragePathOptions& opts) {
+ Azure::Storage::Blobs::ListBlobsOptions list_opts;
+ list_opts.Prefix = opts.prefix;
+ list_opts.PageSizeHint = BlobBatchMaxOperations;
+ auto resp = _client->ListBlobs(list_opts);
+ auto batch = _client->CreateBatch();
+ for (auto&& blob_item : resp.Blobs) {
+ batch.DeleteBlob(blob_item.Name);
+ }
+ auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch);
}, opts);
+ if (response.status.code != ErrorCode::OK) {
+ return response;
+ }
+ while (!resp.NextPageToken->empty()) {
+ batch = _client->CreateBatch();
+ list_opts.ContinuationToken = resp.NextPageToken;
+ resp = _client->ListBlobs(list_opts);
+ for (auto&& blob_item : resp.Blobs) {
+ batch.DeleteBlob(blob_item.Name);
+ }
+ auto response = do_azure_client_call([&]() {
_client->SubmitBatch(batch); }, opts);
+ if (response.status.code != ErrorCode::OK) {
+ return response;
+ }
+ }
+ return {};
+}
+} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/azure_obj_storage_client.h
b/be/src/io/fs/azure_obj_storage_client.h
new file mode 100644
index 00000000000..a8c2db9d4db
--- /dev/null
+++ b/be/src/io/fs/azure_obj_storage_client.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/fs/obj_storage_client.h"
+
+namespace Azure::Storage::Blobs {
+class BlobContainerClient;
+} // namespace Azure::Storage::Blobs
+
+namespace doris::io {
+
+struct AzureCompleteMultiParts : public ObjectCompleteMultiParts {
+ std::vector<int> block_ids;
+};
+
+class ObjClientHolder;
+
+class AzureObjStorageClient final : public ObjStorageClient {
+public:
+
AzureObjStorageClient(std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient>
client)
+ : _client(std::move(client)) {}
+ ~AzureObjStorageClient() override = default;
+ ObjectStorageUploadResponse create_multipart_upload(
+ const ObjectStoragePathOptions& opts) override;
+ ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
+ std::string_view stream) override;
+ ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions&
opts, std::string_view,
+ int partNum) override;
+ ObjectStorageResponse complete_multipart_upload(
+ const ObjectStoragePathOptions& opts,
+ const ObjectCompleteMultiParts& completed_parts) override;
+ ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions&
opts) override;
+ ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts,
void* buffer,
+ size_t offset, size_t bytes_read,
+ size_t* size_return) override;
+ ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts,
+ std::vector<FileInfo>* files) override;
+ ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts,
+ std::vector<std::string> objs)
override;
+ ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts)
override;
+ ObjectStorageResponse delete_objects_recursively(const
ObjectStoragePathOptions& opts) override;
+
+private:
+ std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
+};
+
+} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/obj_storage_client.h
b/be/src/io/fs/obj_storage_client.h
index 3ab0a8e2dea..2a99bde80f1 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -27,7 +27,8 @@ namespace io {
// Names are in lexico order.
enum class ObjStorageType : uint8_t {
- AWS = 0,
+ UNKNOWN = 0,
+ AWS = 1,
AZURE,
BOS,
COS,
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 27aff992f4c..648a80cda8c 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -79,12 +79,11 @@ ObjClientHolder::ObjClientHolder(S3ClientConf conf) :
_conf(std::move(conf)) {}
ObjClientHolder::~ObjClientHolder() = default;
Status ObjClientHolder::init() {
- auto client = S3ClientFactory::instance().create(_conf);
- if (!client) {
+ _client = S3ClientFactory::instance().create(_conf);
+ if (!_client) {
return Status::InternalError("failed to init s3 client with conf {}",
_conf.to_string());
}
- _client = std::make_shared<S3ObjStorageClient>(std::move(client));
return Status::OK();
}
@@ -100,6 +99,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) {
reset_conf.ak = conf.ak;
reset_conf.sk = conf.sk;
reset_conf.token = conf.token;
+ reset_conf.bucket = conf.bucket;
// Should check endpoint here?
}
@@ -112,7 +112,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) {
{
std::lock_guard lock(_mtx);
- _client = std::make_shared<S3ObjStorageClient>(std::move(client));
+ _client = std::move(client);
_conf = std::move(reset_conf);
}
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 6cf8e97962e..e89366f3ab8 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -29,6 +29,7 @@
#include <util/string_util.h>
#include <atomic>
+#include <azure/storage/blobs/blob_container_client.hpp>
#include <cstdlib>
#include <filesystem>
#include <functional>
@@ -40,6 +41,9 @@
#include "common/logging.h"
#include "common/status.h"
#include "common/sync_point.h"
+#include "io/fs/azure_obj_storage_client.h"
+#include "io/fs/obj_storage_client.h"
+#include "io/fs/s3_obj_storage_client.h"
#include "runtime/exec_env.h"
#include "s3_uri.h"
#include "vec/exec/scan/scanner_scheduler.h"
@@ -138,7 +142,7 @@ S3ClientFactory::S3ClientFactory() {
string S3ClientFactory::get_valid_ca_cert_path() {
vector<std::string> vec_ca_file_path =
doris::split(config::ca_cert_file_paths, ";");
- vector<std::string>::iterator it = vec_ca_file_path.begin();
+ auto it = vec_ca_file_path.begin();
for (; it != vec_ca_file_path.end(); ++it) {
if (std::filesystem::exists(*it)) {
return *it;
@@ -156,9 +160,7 @@ S3ClientFactory& S3ClientFactory::instance() {
return ret;
}
-std::shared_ptr<Aws::S3::S3Client> S3ClientFactory::create(const S3ClientConf&
s3_conf) {
- TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_client_factory::create",
- std::make_shared<Aws::S3::S3Client>());
+std::shared_ptr<io::ObjStorageClient> S3ClientFactory::create(const
S3ClientConf& s3_conf) {
if (!is_s3_conf_valid(s3_conf)) {
return nullptr;
}
@@ -172,6 +174,36 @@ std::shared_ptr<Aws::S3::S3Client>
S3ClientFactory::create(const S3ClientConf& s
}
}
+ auto obj_client = (s3_conf.provider == io::ObjStorageType::AZURE)
+ ? _create_azure_client(s3_conf)
+ : _create_s3_client(s3_conf);
+
+ {
+ uint64_t hash = s3_conf.get_hash();
+ std::lock_guard l(_lock);
+ _cache[hash] = obj_client;
+ }
+ return obj_client;
+}
+
+std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
+ const S3ClientConf& s3_conf) {
+ auto cred =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak,
s3_conf.sk);
+
+ const std::string container_name = s3_conf.bucket;
+ const std::string uri = fmt::format("{}://{}.blob.core.windows.net/{}",
+ config::s3_client_http_scheme,
s3_conf.ak, container_name);
+
+ auto containerClient =
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
+ return
std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
+}
+
+std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client(
+ const S3ClientConf& s3_conf) {
+ TEST_SYNC_POINT_RETURN_WITH_VALUE(
+ "s3_client_factory::create",
+
std::make_shared<io::S3ObjStorageClient>(std::make_shared<Aws::S3::S3Client>()));
Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
aws_config.endpointOverride = s3_conf.endpoint;
aws_config.region = s3_conf.region;
@@ -231,11 +263,8 @@ std::shared_ptr<Aws::S3::S3Client>
S3ClientFactory::create(const S3ClientConf& s
s3_conf.use_virtual_addressing);
}
- {
- std::lock_guard l(_lock);
- _cache[hash] = new_client;
- }
- return new_client;
+ auto obj_client =
std::make_shared<io::S3ObjStorageClient>(std::move(new_client));
+ return obj_client;
}
Status S3ClientFactory::convert_properties_to_s3_conf(
@@ -293,4 +322,105 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
return Status::OK();
}
+S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) {
+ S3Conf ret {
+ .bucket = info.bucket(),
+ .prefix = info.prefix(),
+ .client_conf {
+ .endpoint = info.endpoint(),
+ .region = info.region(),
+ .ak = info.ak(),
+ .sk = info.sk(),
+ .bucket = info.bucket(),
+ .provider = io::ObjStorageType::AWS,
+ },
+ .sse_enabled = info.sse_enabled(),
+ };
+
+ io::ObjStorageType type = io::ObjStorageType::AWS;
+ switch (info.provider()) {
+ case cloud::ObjectStoreInfoPB_Provider_OSS:
+ type = io::ObjStorageType::OSS;
+ break;
+ case cloud::ObjectStoreInfoPB_Provider_S3:
+ type = io::ObjStorageType::AWS;
+ break;
+ case cloud::ObjectStoreInfoPB_Provider_COS:
+ type = io::ObjStorageType::COS;
+ break;
+ case cloud::ObjectStoreInfoPB_Provider_OBS:
+ type = io::ObjStorageType::OBS;
+ break;
+ case cloud::ObjectStoreInfoPB_Provider_BOS:
+ type = io::ObjStorageType::BOS;
+ break;
+ case cloud::ObjectStoreInfoPB_Provider_GCP:
+ type = io::ObjStorageType::GCP;
+ break;
+ case cloud::ObjectStoreInfoPB_Provider_AZURE:
+ type = io::ObjStorageType::AZURE;
+ break;
+ default:
+ LOG_FATAL("unknown provider type {}, info {}", info.provider(),
ret.to_string());
+ __builtin_unreachable();
+ }
+ ret.client_conf.provider = type;
+ return ret;
+}
+
+S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) {
+ S3Conf ret {
+ .bucket = param.bucket,
+ .prefix = param.root_path,
+ .client_conf = {
+ .endpoint = param.endpoint,
+ .region = param.region,
+ .ak = param.ak,
+ .sk = param.sk,
+ .token = param.token,
+ .bucket = param.bucket,
+ .provider = io::ObjStorageType::AWS,
+ .max_connections = param.max_conn,
+ .request_timeout_ms = param.request_timeout_ms,
+ .connect_timeout_ms = param.conn_timeout_ms,
+ // When using cold heat separation in minio, user might
use ip address directly,
+ // which needs enable use_virtual_addressing to true
+ .use_virtual_addressing = !param.use_path_style,
+ }};
+ io::ObjStorageType type = io::ObjStorageType::AWS;
+ switch (param.provider) {
+ case TObjStorageType::UNKNOWN:
+ LOG_INFO("Receive one legal storage resource, set provider type to
aws, param detail {}",
+ ret.to_string());
+ type = io::ObjStorageType::AWS;
+ break;
+ case TObjStorageType::AWS:
+ type = io::ObjStorageType::AWS;
+ break;
+ case TObjStorageType::AZURE:
+ type = io::ObjStorageType::AZURE;
+ break;
+ case TObjStorageType::BOS:
+ type = io::ObjStorageType::BOS;
+ break;
+ case TObjStorageType::COS:
+ type = io::ObjStorageType::COS;
+ break;
+ case TObjStorageType::OBS:
+ type = io::ObjStorageType::OBS;
+ break;
+ case TObjStorageType::OSS:
+ type = io::ObjStorageType::OSS;
+ break;
+ case TObjStorageType::GCP:
+ type = io::ObjStorageType::GCP;
+ break;
+ default:
+ LOG_FATAL("unknown provider type {}, info {}", param.provider,
ret.to_string());
+ __builtin_unreachable();
+ }
+ ret.client_conf.provider = type;
+ return ret;
+}
+
} // end namespace doris
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 5dd68069759..1764b1b8b86 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -22,6 +22,7 @@
#include <aws/s3/S3Errors.h>
#include <bvar/bvar.h>
#include <fmt/format.h>
+#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/cloud.pb.h>
#include <stdint.h>
@@ -32,14 +33,14 @@
#include <unordered_map>
#include "common/status.h"
+#include "io/fs/obj_storage_client.h"
#include "util/s3_rate_limiter.h"
#include "vec/common/string_ref.h"
-namespace Aws {
-namespace S3 {
+namespace Aws::S3 {
class S3Client;
-} // namespace S3
-} // namespace Aws
+} // namespace Aws::S3
+
namespace bvar {
template <typename T>
class Adder;
@@ -95,6 +96,9 @@ struct S3ClientConf {
std::string ak;
std::string sk;
std::string token;
+ // For azure we'd better support the bucket at the first time init azure
blob container client
+ std::string bucket;
+ io::ObjStorageType provider = io::ObjStorageType::AWS;
int max_connections = -1;
int request_timeout_ms = -1;
int connect_timeout_ms = -1;
@@ -107,6 +111,7 @@ struct S3ClientConf {
hash_code ^= crc32_hash(token);
hash_code ^= crc32_hash(endpoint);
hash_code ^= crc32_hash(region);
+ hash_code ^= crc32_hash(bucket);
hash_code ^= max_connections;
hash_code ^= request_timeout_ms;
hash_code ^= connect_timeout_ms;
@@ -116,9 +121,9 @@ struct S3ClientConf {
std::string to_string() const {
return fmt::format(
- "(ak={}, token={}, endpoint={}, region={}, max_connections={},
"
+ "(ak={}, token={}, endpoint={}, region={}, bucket={},
max_connections={}, "
"request_timeout_ms={}, connect_timeout_ms={},
use_virtual_addressing={}",
- ak, token, endpoint, region, max_connections,
request_timeout_ms,
+ ak, token, endpoint, region, bucket, max_connections,
request_timeout_ms,
connect_timeout_ms, use_virtual_addressing);
}
};
@@ -129,7 +134,8 @@ struct S3Conf {
S3ClientConf client_conf;
bool sse_enabled = false;
- cloud::ObjectStoreInfoPB::Provider provider;
+ static S3Conf get_s3_conf(const cloud::ObjectStoreInfoPB&);
+ static S3Conf get_s3_conf(const TS3StorageParam&);
std::string to_string() const {
return fmt::format("(bucket={}, prefix={}, client_conf={},
sse_enabled={})", bucket, prefix,
@@ -143,7 +149,7 @@ public:
static S3ClientFactory& instance();
- std::shared_ptr<Aws::S3::S3Client> create(const S3ClientConf& s3_conf);
+ std::shared_ptr<io::ObjStorageClient> create(const S3ClientConf& s3_conf);
static Status convert_properties_to_s3_conf(const std::map<std::string,
std::string>& prop,
const S3URI& s3_uri, S3Conf*
s3_conf);
@@ -161,12 +167,14 @@ public:
S3RateLimiterHolder* rate_limiter(S3RateLimitType type);
private:
+ std::shared_ptr<io::ObjStorageClient> _create_s3_client(const
S3ClientConf& s3_conf);
+ std::shared_ptr<io::ObjStorageClient> _create_azure_client(const
S3ClientConf& s3_conf);
S3ClientFactory();
static std::string get_valid_ca_cert_path();
Aws::SDKOptions _aws_options;
std::mutex _lock;
- std::unordered_map<uint64_t, std::shared_ptr<Aws::S3::S3Client>> _cache;
+ std::unordered_map<uint64_t, std::shared_ptr<io::ObjStorageClient>> _cache;
std::string _ca_cert_file_path;
std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
};
diff --git a/be/test/io/fs/azure_test.cpp b/be/test/io/fs/azure_test.cpp
new file mode 100644
index 00000000000..f158cf0a7b4
--- /dev/null
+++ b/be/test/io/fs/azure_test.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/core.h>
+#include <gtest/gtest.h>
+
+#include <azure/storage/blobs.hpp>
+#include <azure/storage/blobs/blob_client.hpp>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
+#include <cstdio>
+#include <iostream>
+#include <stdexcept>
+#include <utility>
+
+#include "common/config.h"
+
+namespace doris {
+
+std::string GetConnectionString() {
+ const static std::string ConnectionString = "";
+
+ if (!ConnectionString.empty()) {
+ return ConnectionString;
+ }
+ const static std::string envConnectionString =
std::getenv("AZURE_STORAGE_CONNECTION_STRING");
+ if (!envConnectionString.empty()) {
+ return envConnectionString;
+ }
+ throw std::runtime_error("Cannot find connection string.");
+}
+
+TEST(AzureTest, Write) {
+ GTEST_SKIP() << "Skipping Azure test, because this test it to test the
compile and linkage";
+ using namespace Azure::Storage::Blobs;
+
+ std::string accountName = config::test_s3_ak;
+ std::string accountKey = config::test_s3_sk;
+
+ auto cred =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(accountName,
accountKey);
+
+ const std::string containerName = config::test_s3_bucket;
+ const std::string blobName = "sample-blob";
+ const std::string blobContent = "Fuck Azure!";
+ const std::string uri =
+ fmt::format("https://{}.blob.core.windows.net/{}", accountName,
containerName);
+
+ // auto containerClient =
+ //
BlobContainerClient::CreateFromConnectionString(GetConnectionString(),
containerName);
+
+ auto containerClient = BlobContainerClient(uri, cred);
+ containerClient.CreateIfNotExists();
+
+ std::vector<int> blockIds1;
+
+ auto blockBlobContainer = containerClient.GetBlockBlobClient(blobName);
+
+ // Azure::Storage::StorageException exception;
+
+ BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
+ std::vector<std::string> blockIds;
+
+ std::vector<uint8_t> buffer(blobContent.begin(), blobContent.end());
+ auto aresp = blobClient.UploadFrom(buffer.data(), buffer.size());
+
+ Azure::Storage::Metadata blobMetadata = {{"key1", "value1"}, {"key2",
"value2"}};
+ blobClient.SetMetadata(blobMetadata);
+
+ auto properties = blobClient.GetProperties().Value;
+ for (auto metadata : properties.Metadata) {
+ std::cout << metadata.first << ":" << metadata.second << std::endl;
+ }
+ // We know blob size is small, so it's safe to cast here.
+ buffer.resize(static_cast<size_t>(properties.BlobSize));
+
+ blobClient.DownloadTo(buffer.data(), buffer.size());
+
+ std::cout << std::string(buffer.begin(), buffer.end()) << std::endl;
+}
+
+} // namespace doris
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index 75a49d813a4..4f3594dc5f0 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -263,7 +263,8 @@ static auto test_mock_callbacks = std::array {
pair->first = mock_client->head_object(req);
}},
MockCallback {"s3_client_factory::create", [](auto&& outcome) {
- auto pair =
try_any_cast_ret<std::shared_ptr<Aws::S3::S3Client>>(outcome);
+ auto pair =
try_any_cast_ret<std::shared_ptr<io::S3ObjStorageClient>>(
+ outcome);
pair->second = true;
}}};
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 99c8c2647f2..e1c3c9be5ab 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -172,12 +172,14 @@ message ObjectStoreInfoPB {
// presigned url use
// oss,aws,cos,obs,bos
enum Provider {
- OSS = 0;
- S3 = 1;
- COS = 2;
- OBS = 3;
- BOS = 4;
- GCP = 5;
+ UNKONWN = -1;
+ OSS = 0;
+ S3 = 1;
+ COS = 2;
+ OBS = 3;
+ BOS = 4;
+ GCP = 5;
+ AZURE = 6;
}
optional int64 ctime = 1;
optional int64 mtime = 2;
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 104adca70fa..cc5dc367915 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -61,6 +61,16 @@ enum TTabletType {
TABLET_TYPE_MEMORY = 1
}
+enum TObjStorageType {
+ UNKNOWN = 0,
+ AWS = 1,
+ AZURE = 2,
+ BOS = 3,
+ COS = 4,
+ OBS = 5,
+ OSS = 6,
+ GCP = 7
+}
struct TS3StorageParam {
1: optional string endpoint
@@ -74,6 +84,7 @@ struct TS3StorageParam {
9: optional string bucket
10: optional bool use_path_style = false
11: optional string token
+ 12: optional TObjStorageType provider
}
struct TStoragePolicy {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]