szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1268163955
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -29,9 +29,51 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::aws::processors {
+namespace {
+class ReadCallback {
+ public:
+ ReadCallback(uint64_t flow_size, const
minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper&
s3_wrapper,
+ uint64_t multipart_threshold, uint64_t multipart_size,
core::logging::Logger& logger)
+ : flow_size_(flow_size),
+ options_(options),
+ s3_wrapper_(s3_wrapper),
+ multipart_threshold_(multipart_threshold),
+ multipart_size_(multipart_size),
+ logger_(logger) {
+ }
+
+ int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
+ try {
+ if (flow_size_ <= multipart_threshold_) {
Review Comment:
I'd change this to "less-than", it feels more intuitive to have the
threshold just hit already be multipart.
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
}
}
+void PutS3Object::ageOffMultipartUploads(const CommonProperties
&common_properties) {
+ const auto now = std::chrono::system_clock::now();
+ if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+ logger_->log_debug("Multipart Upload Age off interval still in progress,
not checking obsolete multipart uploads.");
+ return;
+ }
+
+ logger_->log_trace("Listing aged off multipart uploads still in progress.");
+ aws::s3::ListMultipartUploadsRequestParameters
list_params(common_properties.credentials, *client_config_);
+ list_params.setClientConfig(common_properties.proxy,
common_properties.endpoint_override_url);
+ list_params.bucket = common_properties.bucket;
+ list_params.age_off_limit = multipart_upload_max_age_threshold_;
+ list_params.use_virtual_addressing = use_virtual_addressing_;
+ auto aged_off_uploads_in_progress =
s3_wrapper_.listMultipartUploads(list_params);
+ if (!aged_off_uploads_in_progress) {
+ logger_->log_error("Listing aged off multipart uploads failed!");
+ return;
+ }
+
+ logger_->log_info("Found %d aged off pending multipart upload jobs in bucket
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+ size_t aborted = 0;
+ for (const auto& upload : *aged_off_uploads_in_progress) {
+ logger_->log_info("Aborting multipart upload with key '%s' and upload id
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
Review Comment:
This log message should include the reason for the abort, to avoid confusion.
##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public
minifi::utils::ListedObject {
using HeadObjectRequestParameters = GetObjectRequestParameters;
using GetObjectTagsParameters = DeleteObjectRequestParameters;
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+ ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::optional<std::chrono::milliseconds> age_off_limit; // if set, only
list the aged off uploads
+ bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+ std::string key;
+ std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+ AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::string key;
+ std::string upload_id;
+ bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+ explicit StreamReadException(const std::string& error) :
Exception(GENERAL_EXCEPTION, error) {}
+};
+
class S3Wrapper {
public:
+ static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
S3Wrapper();
explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
- std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+ std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t
flow_size);
+ std::optional<PutObjectResult> putObjectMultipart(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t
multipart_size);
bool deleteObject(const DeleteObjectRequestParameters& params);
std::optional<GetObjectResult> getObject(const GetObjectRequestParameters&
get_object_params, io::OutputStream& out_body);
std::optional<std::vector<ListedObjectAttributes>> listBucket(const
ListRequestParameters& params);
std::optional<std::map<std::string, std::string>> getObjectTags(const
GetObjectTagsParameters& params);
std::optional<HeadObjectResult> headObject(const
HeadObjectRequestParameters& head_object_params);
+ std::optional<std::vector<MultipartUpload>> listMultipartUploads(const
ListMultipartUploadsRequestParameters& params);
+ bool abortMultipartUpload(const AbortMultipartUploadRequestParameters&
params);
+ void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds
multipart_upload_max_age_threshold);
+ void
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
state_manager);
virtual ~S3Wrapper() = default;
private:
+ struct UploadPartsResult {
+ std::string upload_id;
+ std::vector<std::string> part_etags;
+ };
+
static Expiration getExpiration(const std::string& expiration);
- void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const
std::string& canned_acl) const;
+ template<typename RequestType>
+ void setCannedAcl(RequestType& request, const std::string& canned_acl) const
{
+ if (canned_acl.empty()) return;
+
+ const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto&
kv) { return kv.first; });
+ if (it == CANNED_ACL_MAP.end()) return;
+
+ logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+ request.SetACL(it->second);
+ }
+
+ template<typename RequestType>
+ RequestType createPutObjectRequest(const PutObjectRequestParameters&
put_object_params) {
+ RequestType request;
+ request.WithBucket(put_object_params.bucket)
+ .WithKey(put_object_params.object_key)
+ .WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP,
put_object_params.storage_class))
+ .WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP,
put_object_params.server_side_encryption))
+ .WithMetadata(put_object_params.user_metadata_map)
+ .WithGrantFullControl(put_object_params.fullcontrol_user_list)
+ .WithGrantRead(put_object_params.read_permission_user_list)
+ .WithGrantReadACP(put_object_params.read_acl_user_list)
+ .WithGrantWriteACP(put_object_params.write_acl_user_list);
+ request.SetContentType(put_object_params.content_type);
+ setCannedAcl<RequestType>(request, put_object_params.canned_acl);
+ return request;
+ }
+
+ template<typename ResultType>
+ PutObjectResult createPutObjectResult(const ResultType& upload_result) {
+ PutObjectResult put_object_result;
+ // Etags are returned by AWS in quoted form that should be removed
+ put_object_result.etag =
minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(),
'"');
+ put_object_result.version = upload_result.GetVersionId();
+
+ // GetExpiration returns a string pair with a date and a ruleid in
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+ // s3.expiration only needs the date member of this pair
+ put_object_result.expiration =
getExpiration(upload_result.GetExpiration()).expiration_time;
+ put_object_result.ssealgorithm =
getEncryptionString(upload_result.GetServerSideEncryption());
+ return put_object_result;
+ }
Review Comment:
```suggestion
PutObjectResult createPutObjectResult(const auto& upload_result) {
return {
// Etags are returned by AWS in quoted form that should be removed
.etag =
minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(),
'"'),
.version = upload_result.GetVersionId(),
// GetExpiration returns a string pair with a date and a ruleid in
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
// s3.expiration only needs the date member of this pair
.expiration =
getExpiration(upload_result.GetExpiration()).expiration_time,
.ssealgorithm =
getEncryptionString(upload_result.GetServerSideEncryption())
};
}
```
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -298,4 +407,58 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const
GetObjectRequestParamet
return result;
}
+void S3Wrapper::addListMultipartUploadResults(const
Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads,
std::optional<std::chrono::milliseconds> age_off_limit,
+ std::vector<MultipartUpload>& filtered_uploads) {
+ const auto now = Aws::Utils::DateTime::Now();
+ for (const auto& upload : uploads) {
+ // if age_off_limit is set only list the aged off uploads
+ if (age_off_limit && now - upload.GetInitiated() <= *age_off_limit) {
+ logger_->log_debug("Multipart upload with key '%s' and upload id '%s'
has not aged off yet", upload.GetKey(), upload.GetUploadId());
+ continue;
+ }
+
+ MultipartUpload filtered_upload;
+ filtered_upload.key = upload.GetKey();
+ filtered_upload.upload_id = upload.GetUploadId();
+ filtered_uploads.push_back(filtered_upload);
Review Comment:
I'd do this for simplicity
```suggestion
filtered_uploads.push_back({.key = upload.GetKey(), .upload_id =
upload.GetUploadId()});
```
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
}
}
+void PutS3Object::ageOffMultipartUploads(const CommonProperties
&common_properties) {
+ const auto now = std::chrono::system_clock::now();
+ if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+ logger_->log_debug("Multipart Upload Age off interval still in progress,
not checking obsolete multipart uploads.");
+ return;
+ }
+
+ logger_->log_trace("Listing aged off multipart uploads still in progress.");
+ aws::s3::ListMultipartUploadsRequestParameters
list_params(common_properties.credentials, *client_config_);
+ list_params.setClientConfig(common_properties.proxy,
common_properties.endpoint_override_url);
+ list_params.bucket = common_properties.bucket;
+ list_params.age_off_limit = multipart_upload_max_age_threshold_;
+ list_params.use_virtual_addressing = use_virtual_addressing_;
+ auto aged_off_uploads_in_progress =
s3_wrapper_.listMultipartUploads(list_params);
+ if (!aged_off_uploads_in_progress) {
+ logger_->log_error("Listing aged off multipart uploads failed!");
+ return;
+ }
+
+ logger_->log_info("Found %d aged off pending multipart upload jobs in bucket
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+ size_t aborted = 0;
+ for (const auto& upload : *aged_off_uploads_in_progress) {
+ logger_->log_info("Aborting multipart upload with key '%s' and upload id
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+ aws::s3::AbortMultipartUploadRequestParameters
abort_params(common_properties.credentials, *client_config_);
+ abort_params.setClientConfig(common_properties.proxy,
common_properties.endpoint_override_url);
+ abort_params.bucket = common_properties.bucket;
+ abort_params.key = upload.key;
+ abort_params.upload_id = upload.upload_id;
+ abort_params.use_virtual_addressing = use_virtual_addressing_;
+ if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+ logger_->log_error("Failed to abort multipart upload with key '%s' and
upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id,
abort_params.bucket);
+ continue;
+ }
+ ++aborted;
+ }
+ if (aborted > 0) {
+ logger_->log_info("Aborted %d pending multipart upload jobs in bucket
'%s'", aborted, common_properties.bucket);
+ }
+
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+ last_ageoff_time_ = now;
Review Comment:
shouldn't this synchronize the whole aged off block of data between early
last_ageoff_time and early now, doing an cmpxchgq at the start and check the
interval for expired ageoff parts?
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -263,17 +372,17 @@ std::optional<HeadObjectResult>
S3Wrapper::headObject(const HeadObjectRequestPar
template<typename ListRequest>
ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
ListRequest request;
- request.SetBucket(params.bucket);
- request.SetDelimiter(params.delimiter);
- request.SetPrefix(params.prefix);
+ request.WithBucket(params.bucket)
+ .WithDelimiter(params.delimiter)
+ .WithPrefix(params.prefix);
Review Comment:
I'm not sure how safe is it long-term to rely on `With*` functions modifying
the state in-place. I'd do something like this instead:
```c++
return ListRequest{}
.WithBucket(params.bucket)
.WithDelimiter(params.delimiter)
.WithPrefix(params.prefix);
```
##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public
minifi::utils::ListedObject {
using HeadObjectRequestParameters = GetObjectRequestParameters;
using GetObjectTagsParameters = DeleteObjectRequestParameters;
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+ ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::optional<std::chrono::milliseconds> age_off_limit; // if set, only
list the aged off uploads
+ bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+ std::string key;
+ std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+ AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::string key;
+ std::string upload_id;
+ bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+ explicit StreamReadException(const std::string& error) :
Exception(GENERAL_EXCEPTION, error) {}
+};
+
class S3Wrapper {
public:
+ static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
S3Wrapper();
explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
- std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+ std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t
flow_size);
+ std::optional<PutObjectResult> putObjectMultipart(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t
multipart_size);
bool deleteObject(const DeleteObjectRequestParameters& params);
std::optional<GetObjectResult> getObject(const GetObjectRequestParameters&
get_object_params, io::OutputStream& out_body);
std::optional<std::vector<ListedObjectAttributes>> listBucket(const
ListRequestParameters& params);
std::optional<std::map<std::string, std::string>> getObjectTags(const
GetObjectTagsParameters& params);
std::optional<HeadObjectResult> headObject(const
HeadObjectRequestParameters& head_object_params);
+ std::optional<std::vector<MultipartUpload>> listMultipartUploads(const
ListMultipartUploadsRequestParameters& params);
+ bool abortMultipartUpload(const AbortMultipartUploadRequestParameters&
params);
+ void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds
multipart_upload_max_age_threshold);
+ void
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
state_manager);
virtual ~S3Wrapper() = default;
private:
+ struct UploadPartsResult {
+ std::string upload_id;
+ std::vector<std::string> part_etags;
+ };
+
static Expiration getExpiration(const std::string& expiration);
- void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const
std::string& canned_acl) const;
+ template<typename RequestType>
+ void setCannedAcl(RequestType& request, const std::string& canned_acl) const
{
+ if (canned_acl.empty()) return;
+
+ const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto&
kv) { return kv.first; });
+ if (it == CANNED_ACL_MAP.end()) return;
+
+ logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+ request.SetACL(it->second);
+ }
+
+ template<typename RequestType>
+ RequestType createPutObjectRequest(const PutObjectRequestParameters&
put_object_params) {
+ RequestType request;
+ request.WithBucket(put_object_params.bucket)
Review Comment:
```suggestion
auto request = RequestType{}.WithBucket(put_object_params.bucket)
```
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -235,8 +344,8 @@ std::optional<std::vector<ListedObjectAttributes>>
S3Wrapper::listBucket(const L
std::optional<std::map<std::string, std::string>>
S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
Aws::S3::Model::GetObjectTaggingRequest request;
- request.SetBucket(params.bucket);
- request.SetKey(params.object_key);
+ request.WithBucket(params.bucket)
+ .WithKey(params.object_key);
Review Comment:
In this case, I'd write it like that:
```c++
auto request =
Aws::S3::Model::GetObjectTaggingRequest{}.WithBucket(params.bucket).WithKey(params.object_key);
```
Or to modify in-place, I'd add self-assignment, just in case it stops
returning a modified reference to `*this`, and starts returning a new object:
```c++
request = request.WithBucket(params.bucket);
```
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -200,13 +306,15 @@ void PutS3Object::onTrigger(const
std::shared_ptr<core::ProcessContext> &context
return;
}
+ ageOffMultipartUploads(*common_properties);
+
auto put_s3_request_params = buildPutS3RequestParams(context, flow_file,
*common_properties);
if (!put_s3_request_params) {
session->transfer(flow_file, Failure);
return;
}
- PutS3Object::ReadCallback callback(flow_file->getSize(),
*put_s3_request_params, s3_wrapper_);
+ ReadCallback callback(flow_file->getSize(), *put_s3_request_params,
s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);
Review Comment:
You could also inject `this` to keep things simpler, or replace the whole
callback class with an inline lambda with `[this, &flow_file,
&put_s3_request_params]` capture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]