lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1273249255


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties 
&common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, 
not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters 
list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = 
s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket 
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id 
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);

Review Comment:
   Good point, updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties 
&common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, 
not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters 
list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = 
s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket 
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id 
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters 
abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and 
upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, 
abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket 
'%s'", aborted, common_properties.bucket);
+  }
+  
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   I didn't think it was necessary as in scenario where there are overlapping 
onTriggers the worst case scenario would be that the aged off uploads are 
requested to be deleted multiple times. Of course it could be improved to avoid 
this scenario, I didn't see how this could be done with a single atomic 
`compare_and_exchange` call, so I removed the atomic variable and added a mutex 
for this block instead in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -200,13 +306,15 @@ void PutS3Object::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
+  ageOffMultipartUploads(*common_properties);
+
   auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, 
*common_properties);
   if (!put_s3_request_params) {
     session->transfer(flow_file, Failure);
     return;
   }
 
-  PutS3Object::ReadCallback callback(flow_file->getSize(), 
*put_s3_request_params, s3_wrapper_);
+  ReadCallback callback(flow_file->getSize(), *put_s3_request_params, 
s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);

Review Comment:
   You are right at this point this ReadCallback is getting too many parameters 
and it is simpler to have a single lambda, updated in 
bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -263,17 +372,17 @@ std::optional<HeadObjectResult> 
S3Wrapper::headObject(const HeadObjectRequestPar
 template<typename ListRequest>
 ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
   ListRequest request;
-  request.SetBucket(params.bucket);
-  request.SetDelimiter(params.delimiter);
-  request.SetPrefix(params.prefix);
+  request.WithBucket(params.bucket)
+    .WithDelimiter(params.delimiter)
+    .WithPrefix(params.prefix);

Review Comment:
   According to the examples I saw like 
[this](https://github.com/aws/aws-sdk-cpp/blob/6cc48868b7558265fd095ce338ce37320a2968e2/tests/aws-cpp-sdk-s3-crt-integration-tests/BucketAndObjectOperationTest.cpp#L400)
 it should be safe, as it looks to be equivalent to the `Set*` methods, the 
only difference being that the returning references makes it simpler to [chain 
these calls 
together](https://github.com/aws/aws-sdk-cpp/blob/6cc48868b7558265fd095ce338ce37320a2968e2/tests/aws-cpp-sdk-cloudfront-integration-tests/CloudfrontOperationTest.cpp#L182).
 But the version you suggested looks better in my opinion so I updated the 
`With*` calls in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -235,8 +344,8 @@ std::optional<std::vector<ListedObjectAttributes>> 
S3Wrapper::listBucket(const L
 
 std::optional<std::map<std::string, std::string>> 
S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
   Aws::S3::Model::GetObjectTaggingRequest request;
-  request.SetBucket(params.bucket);
-  request.SetKey(params.object_key);
+  request.WithBucket(params.bucket)
+    .WithKey(params.object_key);

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -298,4 +407,58 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const 
GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const 
Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, 
std::optional<std::chrono::milliseconds> age_off_limit,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    // if age_off_limit is set only list the aged off uploads
+    if (age_off_limit && now - upload.GetInitiated() <= *age_off_limit) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' 
has not aged off yet", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public 
minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only 
list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : 
Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t 
flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t 
multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& 
get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const 
ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const 
GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const 
HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const 
ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& 
params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds 
multipart_upload_max_age_threshold);
+  void 
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
 state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const 
{
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& 
kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& 
put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public 
minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only 
list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : 
Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t 
flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t 
multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& 
get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const 
ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const 
GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const 
HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const 
ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& 
params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds 
multipart_upload_max_age_threshold);
+  void 
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
 state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const 
{
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& 
kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& 
put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP, 
put_object_params.storage_class))
+      .WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP, 
put_object_params.server_side_encryption))
+      .WithMetadata(put_object_params.user_metadata_map)
+      .WithGrantFullControl(put_object_params.fullcontrol_user_list)
+      .WithGrantRead(put_object_params.read_permission_user_list)
+      .WithGrantReadACP(put_object_params.read_acl_user_list)
+      .WithGrantWriteACP(put_object_params.write_acl_user_list);
+    request.SetContentType(put_object_params.content_type);
+    setCannedAcl<RequestType>(request, put_object_params.canned_acl);
+    return request;
+  }
+
+  template<typename ResultType>
+  PutObjectResult createPutObjectResult(const ResultType& upload_result) {
+    PutObjectResult put_object_result;
+    // Etags are returned by AWS in quoted form that should be removed
+    put_object_result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), 
'"');
+    put_object_result.version = upload_result.GetVersionId();
+
+    // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+    // s3.expiration only needs the date member of this pair
+    put_object_result.expiration = 
getExpiration(upload_result.GetExpiration()).expiration_time;
+    put_object_result.ssealgorithm = 
getEncryptionString(upload_result.GetServerSideEncryption());
+    return put_object_result;
+  }

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to