This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b598c8b3959 [feature](Azure) Implement Azure's multi part upload in BE
(#36620)
b598c8b3959 is described below
commit b598c8b3959faa2f0de65f895a9186aa441e6319
Author: AlexYue <[email protected]>
AuthorDate: Sat Jun 22 00:17:50 2024 +0800
[feature](Azure) Implement Azure's multi part upload in BE (#36620)
Previously the multipart upload of Azure is not supported, this pr makes
it available.
---
be/src/io/fs/azure_obj_storage_client.cpp | 20 +++++++++++++-----
be/src/io/fs/azure_obj_storage_client.h | 6 +-----
be/src/io/fs/obj_storage_client.h | 7 +++++--
be/src/io/fs/s3_file_writer.cpp | 25 +++++++----------------
be/src/io/fs/s3_file_writer.h | 6 ++----
be/src/io/fs/s3_obj_storage_client.cpp | 13 ++++++++----
be/src/io/fs/s3_obj_storage_client.h | 7 +------
be/src/util/s3_util.cpp | 22 +++++++++++++++++++-
be/src/util/s3_util.h | 9 --------
be/src/vec/sink/writer/vhive_partition_writer.cpp | 2 +-
10 files changed, 62 insertions(+), 55 deletions(-)
diff --git a/be/src/io/fs/azure_obj_storage_client.cpp
b/be/src/io/fs/azure_obj_storage_client.cpp
index 941470f93c3..4bd0d1b7009 100644
--- a/be/src/io/fs/azure_obj_storage_client.cpp
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -17,6 +17,9 @@
#include "io/fs/azure_obj_storage_client.h"
+#include <aws/core/utils/Array.h>
+#include <aws/core/utils/HashingUtils.h>
+
#include <algorithm>
#include <azure/core/http/http.hpp>
#include <azure/core/io/body_stream.hpp>
@@ -38,6 +41,11 @@ std::string wrap_object_storage_path_msg(const
doris::io::ObjectStoragePathOptio
return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket,
opts.key, opts.prefix,
opts.path.native());
}
+
+auto base64_encode_part_num(int part_num) {
+ return Aws::Utils::HashingUtils::Base64Encode(
+ {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
+}
} // namespace
namespace doris::io {
@@ -90,7 +98,8 @@ ObjectStorageUploadResponse
AzureObjStorageClient::upload_part(const ObjectStora
try {
Azure::Core::IO::MemoryBodyStream memory_body(
reinterpret_cast<const uint8_t*>(stream.data()),
stream.size());
- client.StageBlock(std::to_string(part_num), memory_body);
+ // The blockId must be base64 encoded
+ auto resp = client.StageBlock(base64_encode_part_num(part_num),
memory_body);
} catch (Azure::Core::RequestFailedException& e) {
auto msg = fmt::format("Azure request failed because {}, error msg {},
path msg {}",
e.what(), e.Message,
wrap_object_storage_path_msg(opts));
@@ -110,12 +119,13 @@ ObjectStorageUploadResponse
AzureObjStorageClient::upload_part(const ObjectStora
}
ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
- const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts&
completed_parts) {
+ const ObjectStoragePathOptions& opts,
+ const std::vector<ObjectCompleteMultiPart>& completed_parts) {
auto client = _client->GetBlockBlobClient(opts.key);
- const auto& block_ids = static_cast<const
AzureCompleteMultiParts&>(completed_parts).block_ids;
std::vector<std::string> string_block_ids;
- std::ranges::transform(block_ids, std::back_inserter(string_block_ids),
- [](int i) { return std::to_string(i); });
+ std::ranges::transform(
+ completed_parts, std::back_inserter(string_block_ids),
+ [](const ObjectCompleteMultiPart& i) { return
base64_encode_part_num(i.part_num); });
return do_azure_client_call([&]() {
client.CommitBlockList(string_block_ids); }, opts);
}
diff --git a/be/src/io/fs/azure_obj_storage_client.h
b/be/src/io/fs/azure_obj_storage_client.h
index 0dbe56b0b69..7e8678628bd 100644
--- a/be/src/io/fs/azure_obj_storage_client.h
+++ b/be/src/io/fs/azure_obj_storage_client.h
@@ -25,10 +25,6 @@ class BlobContainerClient;
namespace doris::io {
-struct AzureCompleteMultiParts : public ObjectCompleteMultiParts {
- std::vector<int> block_ids;
-};
-
class ObjClientHolder;
class AzureObjStorageClient final : public ObjStorageClient {
@@ -44,7 +40,7 @@ public:
int partNum) override;
ObjectStorageResponse complete_multipart_upload(
const ObjectStoragePathOptions& opts,
- const ObjectCompleteMultiParts& completed_parts) override;
+ const std::vector<ObjectCompleteMultiPart>& completed_parts)
override;
ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions&
opts) override;
ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts,
void* buffer,
size_t offset, size_t bytes_read,
diff --git a/be/src/io/fs/obj_storage_client.h
b/be/src/io/fs/obj_storage_client.h
index cd3db3c7371..8f31b55705b 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -45,7 +45,10 @@ struct ObjectStoragePathOptions {
std::optional<std::string> upload_id = std::nullopt; // only used for S3
upload
};
-struct ObjectCompleteMultiParts {};
+struct ObjectCompleteMultiPart {
+ int part_num = 0;
+ std::string etag = std::string();
+};
struct ObjectStorageStatus {
int code = 0;
@@ -92,7 +95,7 @@ public:
// After a successful execution, the large file can be accessed in the
object storage
virtual ObjectStorageResponse complete_multipart_upload(
const ObjectStoragePathOptions& opts,
- const ObjectCompleteMultiParts& completed_parts) = 0;
+ const std::vector<ObjectCompleteMultiPart>& completed_parts) = 0;
// According to the passed bucket and key, it will access whether the
corresponding file exists in the object storage.
// If it exists, it will return the corresponding file size
virtual ObjectStorageHeadResponse head_object(const
ObjectStoragePathOptions& opts) = 0;
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 91b5aace12b..9af34ea8ca8 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -33,18 +33,12 @@
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
-#include "io/fs/err_utils.h"
#include "io/fs/file_writer.h"
#include "io/fs/path.h"
-#include "io/fs/s3_common.h"
#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
#include "io/fs/s3_obj_storage_client.h"
#include "runtime/exec_env.h"
-#include "util/bvar_helper.h"
-#include "util/defer_op.h"
-#include "util/doris_metrics.h"
-#include "util/runtime_profile.h"
#include "util/s3_util.h"
namespace doris::io {
@@ -310,12 +304,8 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
UploadFileBuffer& buf) {
}
s3_bytes_written_total << buf.get_size();
- std::unique_ptr<Aws::S3::Model::CompletedPart> completed_part =
- std::make_unique<Aws::S3::Model::CompletedPart>();
- completed_part->SetPartNumber(part_num);
- const auto& etag = *resp.etag;
- // DCHECK(etag.empty());
- completed_part->SetETag(etag);
+ ObjectCompleteMultiPart completed_part {
+ static_cast<int>(part_num), resp.etag.has_value() ?
std::move(resp.etag.value()) : ""};
std::unique_lock<std::mutex> lck {_completed_lock};
_completed_parts.emplace_back(std::move(completed_part));
@@ -330,8 +320,8 @@ Status S3FileWriter::_complete() {
_wait_until_finish("early quit");
return _st;
}
- // upload id is empty means there was no multipart upload
- if (upload_id().empty()) {
+ // When the part num is only one, it means the data is less than 5MB so we
can just put it.
+ if (_cur_part_num == 1) {
_wait_until_finish("PutObject");
return _st;
}
@@ -349,10 +339,9 @@ Status S3FileWriter::_complete() {
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
- [](auto& p1, auto& p2) { return p1->GetPartNumber() <
p2->GetPartNumber(); });
+ [](auto& p1, auto& p2) { return p1.part_num < p2.part_num;
});
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2",
&_completed_parts);
- auto resp = client->complete_multipart_upload(
- _obj_storage_path_opts, S3CompleteMultiParts {.parts =
_completed_parts});
+ auto resp = client->complete_multipart_upload(_obj_storage_path_opts,
_completed_parts);
if (resp.status.code != ErrorCode::OK) {
return {resp.status.code, std::move(resp.status.msg)};
}
@@ -399,7 +388,7 @@ std::string S3FileWriter::_dump_completed_part() const {
std::stringstream ss;
ss << "part_numbers:";
for (const auto& part : _completed_parts) {
- ss << " " << part->GetPartNumber();
+ ss << " " << part.part_num;
}
return ss.str();
}
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 67fc9e26998..c67c79ce536 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -62,9 +62,7 @@ public:
return _cache_builder == nullptr ? nullptr : _cache_builder.get();
}
- const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>&
completed_parts() const {
- return _completed_parts;
- }
+ const std::vector<ObjectCompleteMultiPart>& completed_parts() const {
return _completed_parts; }
const std::string& key() const { return _obj_storage_path_opts.key; }
const std::string& bucket() const { return _obj_storage_path_opts.bucket; }
@@ -92,7 +90,7 @@ private:
// Current Part Num for CompletedPart
int _cur_part_num = 1;
std::mutex _completed_lock;
- std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>
_completed_parts;
+ std::vector<ObjectCompleteMultiPart> _completed_parts;
// **Attention** call add_count() before submitting buf to async thread
pool
bthread::CountdownEvent _countdown_event {0};
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp
b/be/src/io/fs/s3_obj_storage_client.cpp
index ffe652583c8..c6cda174833 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -179,15 +179,20 @@ ObjectStorageUploadResponse
S3ObjStorageClient::upload_part(const ObjectStorageP
}
ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
- const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts&
completed_parts) {
+ const ObjectStoragePathOptions& opts,
+ const std::vector<ObjectCompleteMultiPart>& completed_parts) {
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
CompletedMultipartUpload completed_upload;
- const auto aws_complete_parts = static_cast<const
S3CompleteMultiParts&>(completed_parts);
std::vector<CompletedPart> complete_parts;
- std::ranges::transform(aws_complete_parts.parts,
std::back_inserter(complete_parts),
- [](auto&& part_ptr) { return *part_ptr; });
+ std::ranges::transform(completed_parts, std::back_inserter(complete_parts),
+ [](const ObjectCompleteMultiPart& part_ptr) {
+ CompletedPart part;
+ part.SetPartNumber(part_ptr.part_num);
+ part.SetETag(part_ptr.etag);
+ return part;
+ });
completed_upload.SetParts(std::move(complete_parts));
complete_request.WithMultipartUpload(completed_upload);
diff --git a/be/src/io/fs/s3_obj_storage_client.h
b/be/src/io/fs/s3_obj_storage_client.h
index 0bc2d5ef5af..ebc81b81b40 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/be/src/io/fs/s3_obj_storage_client.h
@@ -28,11 +28,6 @@ class CompletedPart;
} // namespace Aws::S3
namespace doris::io {
-
-struct S3CompleteMultiParts : public ObjectCompleteMultiParts {
- std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& parts;
-};
-
class ObjClientHolder;
class S3ObjStorageClient final : public ObjStorageClient {
@@ -47,7 +42,7 @@ public:
int partNum) override;
ObjectStorageResponse complete_multipart_upload(
const ObjectStoragePathOptions& opts,
- const ObjectCompleteMultiParts& completed_parts) override;
+ const std::vector<ObjectCompleteMultiPart>& completed_parts)
override;
ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions&
opts) override;
ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts,
void* buffer,
size_t offset, size_t bytes_read,
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index e89366f3ab8..630a998bd47 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -73,7 +73,18 @@ bool to_int(std::string_view str, int& res) {
return ec == std::errc {};
}
-const std::string USE_PATH_STYLE = "use_path_style";
+constexpr char USE_PATH_STYLE[] = "use_path_style";
+
+constexpr char AZURE_PROVIDER_STRING[] = "AZURE";
+constexpr char S3_PROVIDER[] = "provider";
+constexpr char S3_AK[] = "AWS_ACCESS_KEY";
+constexpr char S3_SK[] = "AWS_SECRET_KEY";
+constexpr char S3_ENDPOINT[] = "AWS_ENDPOINT";
+constexpr char S3_REGION[] = "AWS_REGION";
+constexpr char S3_TOKEN[] = "AWS_TOKEN";
+constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONN_SIZE";
+constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS";
+constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";
} // namespace
S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
@@ -196,6 +207,7 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::_create_azure_client(
config::s3_client_http_scheme,
s3_conf.ak, container_name);
auto containerClient =
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
+ LOG_INFO("create one azure client with {}", s3_conf.to_string());
return
std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
}
@@ -264,6 +276,7 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::_create_s3_client(
}
auto obj_client =
std::make_shared<io::S3ObjStorageClient>(std::move(new_client));
+ LOG_INFO("create one s3 client with {}", s3_conf.to_string());
return obj_client;
}
@@ -302,12 +315,19 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
it->second);
}
}
+ if (auto it = properties.find(S3_PROVIDER); it != properties.end()) {
+ if (0 == strcmp(it->second.c_str(), AZURE_PROVIDER_STRING)) {
+ s3_conf->client_conf.provider = io::ObjStorageType::AZURE;
+ }
+ }
if (s3_uri.get_bucket().empty()) {
return Status::InvalidArgument("Invalid S3 URI {}, bucket is not
specified",
s3_uri.to_string());
}
s3_conf->bucket = s3_uri.get_bucket();
+ // For azure's compatibility
+ s3_conf->client_conf.bucket = s3_uri.get_bucket();
s3_conf->prefix = "";
// See
https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 1764b1b8b86..5b0e9c09200 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -81,15 +81,6 @@ inline ::Aws::Client::AWSError<::Aws::S3::S3Errors>
s3_error_factory() {
#define DO_S3_GET_RATE_LIMIT(code) DO_S3_RATE_LIMIT(S3RateLimitType::GET, code)
-const static std::string S3_AK = "AWS_ACCESS_KEY";
-const static std::string S3_SK = "AWS_SECRET_KEY";
-const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
-const static std::string S3_REGION = "AWS_REGION";
-const static std::string S3_TOKEN = "AWS_TOKEN";
-const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
-const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
-const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
-
struct S3ClientConf {
std::string endpoint;
std::string region;
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index f65eb0d0972..abf1f9007a0 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -190,7 +190,7 @@ THivePartitionUpdate
VHivePartitionWriter::_build_partition_update() {
std::map<int, std::string> etags;
for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
- etags.insert({completed_part->GetPartNumber(),
completed_part->GetETag()});
+ etags.insert({completed_part.part_num, completed_part.etag});
}
s3_mpu_pending_upload.__set_etags(etags);
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]