lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1273249255
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
}
}
+void PutS3Object::ageOffMultipartUploads(const CommonProperties
&common_properties) {
+ const auto now = std::chrono::system_clock::now();
+ if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+ logger_->log_debug("Multipart Upload Age off interval still in progress,
not checking obsolete multipart uploads.");
+ return;
+ }
+
+ logger_->log_trace("Listing aged off multipart uploads still in progress.");
+ aws::s3::ListMultipartUploadsRequestParameters
list_params(common_properties.credentials, *client_config_);
+ list_params.setClientConfig(common_properties.proxy,
common_properties.endpoint_override_url);
+ list_params.bucket = common_properties.bucket;
+ list_params.age_off_limit = multipart_upload_max_age_threshold_;
+ list_params.use_virtual_addressing = use_virtual_addressing_;
+ auto aged_off_uploads_in_progress =
s3_wrapper_.listMultipartUploads(list_params);
+ if (!aged_off_uploads_in_progress) {
+ logger_->log_error("Listing aged off multipart uploads failed!");
+ return;
+ }
+
+ logger_->log_info("Found %d aged off pending multipart upload jobs in bucket
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+ size_t aborted = 0;
+ for (const auto& upload : *aged_off_uploads_in_progress) {
+ logger_->log_info("Aborting multipart upload with key '%s' and upload id
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
Review Comment:
Good point, updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
}
}
+void PutS3Object::ageOffMultipartUploads(const CommonProperties
&common_properties) {
+ const auto now = std::chrono::system_clock::now();
+ if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+ logger_->log_debug("Multipart Upload Age off interval still in progress,
not checking obsolete multipart uploads.");
+ return;
+ }
+
+ logger_->log_trace("Listing aged off multipart uploads still in progress.");
+ aws::s3::ListMultipartUploadsRequestParameters
list_params(common_properties.credentials, *client_config_);
+ list_params.setClientConfig(common_properties.proxy,
common_properties.endpoint_override_url);
+ list_params.bucket = common_properties.bucket;
+ list_params.age_off_limit = multipart_upload_max_age_threshold_;
+ list_params.use_virtual_addressing = use_virtual_addressing_;
+ auto aged_off_uploads_in_progress =
s3_wrapper_.listMultipartUploads(list_params);
+ if (!aged_off_uploads_in_progress) {
+ logger_->log_error("Listing aged off multipart uploads failed!");
+ return;
+ }
+
+ logger_->log_info("Found %d aged off pending multipart upload jobs in bucket
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+ size_t aborted = 0;
+ for (const auto& upload : *aged_off_uploads_in_progress) {
+ logger_->log_info("Aborting multipart upload with key '%s' and upload id
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+ aws::s3::AbortMultipartUploadRequestParameters
abort_params(common_properties.credentials, *client_config_);
+ abort_params.setClientConfig(common_properties.proxy,
common_properties.endpoint_override_url);
+ abort_params.bucket = common_properties.bucket;
+ abort_params.key = upload.key;
+ abort_params.upload_id = upload.upload_id;
+ abort_params.use_virtual_addressing = use_virtual_addressing_;
+ if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+ logger_->log_error("Failed to abort multipart upload with key '%s' and
upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id,
abort_params.bucket);
+ continue;
+ }
+ ++aborted;
+ }
+ if (aborted > 0) {
+ logger_->log_info("Aborted %d pending multipart upload jobs in bucket
'%s'", aborted, common_properties.bucket);
+ }
+
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+ last_ageoff_time_ = now;
Review Comment:
I didn't think it was necessary as in scenario where there are overlapping
onTriggers the worst case scenario would be that the aged off uploads are
requested to be deleted multiple times. Of course it could be improved to avoid
this scenario, I didn't see how this could be done with a single atomic
`compare_and_exchange` call, so I removed the atomic variable and added a mutex
for this block instead in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -200,13 +306,15 @@ void PutS3Object::onTrigger(const
std::shared_ptr<core::ProcessContext> &context
return;
}
+ ageOffMultipartUploads(*common_properties);
+
auto put_s3_request_params = buildPutS3RequestParams(context, flow_file,
*common_properties);
if (!put_s3_request_params) {
session->transfer(flow_file, Failure);
return;
}
- PutS3Object::ReadCallback callback(flow_file->getSize(),
*put_s3_request_params, s3_wrapper_);
+ ReadCallback callback(flow_file->getSize(), *put_s3_request_params,
s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);
Review Comment:
You are right at this point this ReadCallback is getting too many parameters
and it is simpler to have a single lambda, updated in
bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -263,17 +372,17 @@ std::optional<HeadObjectResult>
S3Wrapper::headObject(const HeadObjectRequestPar
template<typename ListRequest>
ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
ListRequest request;
- request.SetBucket(params.bucket);
- request.SetDelimiter(params.delimiter);
- request.SetPrefix(params.prefix);
+ request.WithBucket(params.bucket)
+ .WithDelimiter(params.delimiter)
+ .WithPrefix(params.prefix);
Review Comment:
According to the examples I saw like
[this](https://github.com/aws/aws-sdk-cpp/blob/6cc48868b7558265fd095ce338ce37320a2968e2/tests/aws-cpp-sdk-s3-crt-integration-tests/BucketAndObjectOperationTest.cpp#L400)
it should be safe, as it looks to be equivalent to the `Set*` methods, the
only difference being that the returning references makes it simpler to [chain
these calls
together](https://github.com/aws/aws-sdk-cpp/blob/6cc48868b7558265fd095ce338ce37320a2968e2/tests/aws-cpp-sdk-cloudfront-integration-tests/CloudfrontOperationTest.cpp#L182).
But the version you suggested looks better in my opinion so I updated the
`With*` calls in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -235,8 +344,8 @@ std::optional<std::vector<ListedObjectAttributes>>
S3Wrapper::listBucket(const L
std::optional<std::map<std::string, std::string>>
S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
Aws::S3::Model::GetObjectTaggingRequest request;
- request.SetBucket(params.bucket);
- request.SetKey(params.object_key);
+ request.WithBucket(params.bucket)
+ .WithKey(params.object_key);
Review Comment:
Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -298,4 +407,58 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const
GetObjectRequestParamet
return result;
}
+void S3Wrapper::addListMultipartUploadResults(const
Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads,
std::optional<std::chrono::milliseconds> age_off_limit,
+ std::vector<MultipartUpload>& filtered_uploads) {
+ const auto now = Aws::Utils::DateTime::Now();
+ for (const auto& upload : uploads) {
+ // if age_off_limit is set only list the aged off uploads
+ if (age_off_limit && now - upload.GetInitiated() <= *age_off_limit) {
+ logger_->log_debug("Multipart upload with key '%s' and upload id '%s'
has not aged off yet", upload.GetKey(), upload.GetUploadId());
+ continue;
+ }
+
+ MultipartUpload filtered_upload;
+ filtered_upload.key = upload.GetKey();
+ filtered_upload.upload_id = upload.GetUploadId();
+ filtered_uploads.push_back(filtered_upload);
Review Comment:
Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public
minifi::utils::ListedObject {
using HeadObjectRequestParameters = GetObjectRequestParameters;
using GetObjectTagsParameters = DeleteObjectRequestParameters;
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+ ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::optional<std::chrono::milliseconds> age_off_limit; // if set, only
list the aged off uploads
+ bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+ std::string key;
+ std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+ AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::string key;
+ std::string upload_id;
+ bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+ explicit StreamReadException(const std::string& error) :
Exception(GENERAL_EXCEPTION, error) {}
+};
+
class S3Wrapper {
public:
+ static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
S3Wrapper();
explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
- std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+ std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t
flow_size);
+ std::optional<PutObjectResult> putObjectMultipart(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t
multipart_size);
bool deleteObject(const DeleteObjectRequestParameters& params);
std::optional<GetObjectResult> getObject(const GetObjectRequestParameters&
get_object_params, io::OutputStream& out_body);
std::optional<std::vector<ListedObjectAttributes>> listBucket(const
ListRequestParameters& params);
std::optional<std::map<std::string, std::string>> getObjectTags(const
GetObjectTagsParameters& params);
std::optional<HeadObjectResult> headObject(const
HeadObjectRequestParameters& head_object_params);
+ std::optional<std::vector<MultipartUpload>> listMultipartUploads(const
ListMultipartUploadsRequestParameters& params);
+ bool abortMultipartUpload(const AbortMultipartUploadRequestParameters&
params);
+ void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds
multipart_upload_max_age_threshold);
+ void
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
state_manager);
virtual ~S3Wrapper() = default;
private:
+ struct UploadPartsResult {
+ std::string upload_id;
+ std::vector<std::string> part_etags;
+ };
+
static Expiration getExpiration(const std::string& expiration);
- void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const
std::string& canned_acl) const;
+ template<typename RequestType>
+ void setCannedAcl(RequestType& request, const std::string& canned_acl) const
{
+ if (canned_acl.empty()) return;
+
+ const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto&
kv) { return kv.first; });
+ if (it == CANNED_ACL_MAP.end()) return;
+
+ logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+ request.SetACL(it->second);
+ }
+
+ template<typename RequestType>
+ RequestType createPutObjectRequest(const PutObjectRequestParameters&
put_object_params) {
+ RequestType request;
+ request.WithBucket(put_object_params.bucket)
Review Comment:
Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public
minifi::utils::ListedObject {
using HeadObjectRequestParameters = GetObjectRequestParameters;
using GetObjectTagsParameters = DeleteObjectRequestParameters;
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+ ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::optional<std::chrono::milliseconds> age_off_limit; // if set, only
list the aged off uploads
+ bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+ std::string key;
+ std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+ AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials&
creds, const Aws::Client::ClientConfiguration& config)
+ : RequestParameters(creds, config) {}
+ std::string bucket;
+ std::string key;
+ std::string upload_id;
+ bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+ explicit StreamReadException(const std::string& error) :
Exception(GENERAL_EXCEPTION, error) {}
+};
+
class S3Wrapper {
public:
+ static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
S3Wrapper();
explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
- std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+ std::optional<PutObjectResult> putObject(const PutObjectRequestParameters&
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t
flow_size);
+ std::optional<PutObjectResult> putObjectMultipart(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t
multipart_size);
bool deleteObject(const DeleteObjectRequestParameters& params);
std::optional<GetObjectResult> getObject(const GetObjectRequestParameters&
get_object_params, io::OutputStream& out_body);
std::optional<std::vector<ListedObjectAttributes>> listBucket(const
ListRequestParameters& params);
std::optional<std::map<std::string, std::string>> getObjectTags(const
GetObjectTagsParameters& params);
std::optional<HeadObjectResult> headObject(const
HeadObjectRequestParameters& head_object_params);
+ std::optional<std::vector<MultipartUpload>> listMultipartUploads(const
ListMultipartUploadsRequestParameters& params);
+ bool abortMultipartUpload(const AbortMultipartUploadRequestParameters&
params);
+ void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds
multipart_upload_max_age_threshold);
+ void
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
state_manager);
virtual ~S3Wrapper() = default;
private:
+ struct UploadPartsResult {
+ std::string upload_id;
+ std::vector<std::string> part_etags;
+ };
+
static Expiration getExpiration(const std::string& expiration);
- void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const
std::string& canned_acl) const;
+ template<typename RequestType>
+ void setCannedAcl(RequestType& request, const std::string& canned_acl) const
{
+ if (canned_acl.empty()) return;
+
+ const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto&
kv) { return kv.first; });
+ if (it == CANNED_ACL_MAP.end()) return;
+
+ logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+ request.SetACL(it->second);
+ }
+
+ template<typename RequestType>
+ RequestType createPutObjectRequest(const PutObjectRequestParameters&
put_object_params) {
+ RequestType request;
+ request.WithBucket(put_object_params.bucket)
+ .WithKey(put_object_params.object_key)
+ .WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP,
put_object_params.storage_class))
+ .WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP,
put_object_params.server_side_encryption))
+ .WithMetadata(put_object_params.user_metadata_map)
+ .WithGrantFullControl(put_object_params.fullcontrol_user_list)
+ .WithGrantRead(put_object_params.read_permission_user_list)
+ .WithGrantReadACP(put_object_params.read_acl_user_list)
+ .WithGrantWriteACP(put_object_params.write_acl_user_list);
+ request.SetContentType(put_object_params.content_type);
+ setCannedAcl<RequestType>(request, put_object_params.canned_acl);
+ return request;
+ }
+
+ template<typename ResultType>
+ PutObjectResult createPutObjectResult(const ResultType& upload_result) {
+ PutObjectResult put_object_result;
+ // Etags are returned by AWS in quoted form that should be removed
+ put_object_result.etag =
minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(),
'"');
+ put_object_result.version = upload_result.GetVersionId();
+
+ // GetExpiration returns a string pair with a date and a ruleid in
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+ // s3.expiration only needs the date member of this pair
+ put_object_result.expiration =
getExpiration(upload_result.GetExpiration()).expiration_time;
+ put_object_result.ssealgorithm =
getEncryptionString(upload_result.GetServerSideEncryption());
+ return put_object_result;
+ }
Review Comment:
Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]