szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1268163955


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -29,9 +29,51 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::aws::processors {
 
+namespace {
+class ReadCallback {
+ public:
+  ReadCallback(uint64_t flow_size, const 
minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& 
s3_wrapper,
+        uint64_t multipart_threshold, uint64_t multipart_size, 
core::logging::Logger& logger)
+    : flow_size_(flow_size),
+      options_(options),
+      s3_wrapper_(s3_wrapper),
+      multipart_threshold_(multipart_threshold),
+      multipart_size_(multipart_size),
+      logger_(logger) {
+  }
+
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
+    try {
+      if (flow_size_ <= multipart_threshold_) {

Review Comment:
   I'd change this to "less-than", it feels more intuitive to have the 
threshold just hit already be multipart.



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties 
&common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, 
not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters 
list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = 
s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket 
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id 
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);

Review Comment:
   This log message should include the reason for the abort, to avoid confusion.



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public 
minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only 
list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : 
Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t 
flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t 
multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& 
get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const 
ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const 
GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const 
HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const 
ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& 
params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds 
multipart_upload_max_age_threshold);
+  void 
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
 state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const 
{
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& 
kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& 
put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP, 
put_object_params.storage_class))
+      .WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP, 
put_object_params.server_side_encryption))
+      .WithMetadata(put_object_params.user_metadata_map)
+      .WithGrantFullControl(put_object_params.fullcontrol_user_list)
+      .WithGrantRead(put_object_params.read_permission_user_list)
+      .WithGrantReadACP(put_object_params.read_acl_user_list)
+      .WithGrantWriteACP(put_object_params.write_acl_user_list);
+    request.SetContentType(put_object_params.content_type);
+    setCannedAcl<RequestType>(request, put_object_params.canned_acl);
+    return request;
+  }
+
+  template<typename ResultType>
+  PutObjectResult createPutObjectResult(const ResultType& upload_result) {
+    PutObjectResult put_object_result;
+    // Etags are returned by AWS in quoted form that should be removed
+    put_object_result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), 
'"');
+    put_object_result.version = upload_result.GetVersionId();
+
+    // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+    // s3.expiration only needs the date member of this pair
+    put_object_result.expiration = 
getExpiration(upload_result.GetExpiration()).expiration_time;
+    put_object_result.ssealgorithm = 
getEncryptionString(upload_result.GetServerSideEncryption());
+    return put_object_result;
+  }

Review Comment:
   ```suggestion
     PutObjectResult createPutObjectResult(const auto& upload_result) {
       return {
         // Etags are returned by AWS in quoted form that should be removed
         .etag = 
minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), 
'"'),
         .version = upload_result.GetVersionId(),
   
         // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
         // s3.expiration only needs the date member of this pair
         .expiration = 
getExpiration(upload_result.GetExpiration()).expiration_time,
         .ssealgorithm = 
getEncryptionString(upload_result.GetServerSideEncryption())
       };
     }
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -298,4 +407,58 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const 
GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const 
Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, 
std::optional<std::chrono::milliseconds> age_off_limit,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    // if age_off_limit is set only list the aged off uploads
+    if (age_off_limit && now - upload.GetInitiated() <= *age_off_limit) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' 
has not aged off yet", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);

Review Comment:
   I'd do this for simplicity
   ```suggestion
       filtered_uploads.push_back({.key = upload.GetKey(), .upload_id = 
upload.GetUploadId()});
   ```



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties 
&common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, 
not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters 
list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = 
s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket 
'%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id 
'%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters 
abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, 
common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and 
upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, 
abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket 
'%s'", aborted, common_properties.bucket);
+  }
+  
s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   shouldn't this synchronize the whole aged off block of data between early 
last_ageoff_time and early now, doing an cmpxchgq at the start and check the 
interval for expired ageoff parts?



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -263,17 +372,17 @@ std::optional<HeadObjectResult> 
S3Wrapper::headObject(const HeadObjectRequestPar
 template<typename ListRequest>
 ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
   ListRequest request;
-  request.SetBucket(params.bucket);
-  request.SetDelimiter(params.delimiter);
-  request.SetPrefix(params.prefix);
+  request.WithBucket(params.bucket)
+    .WithDelimiter(params.delimiter)
+    .WithPrefix(params.prefix);

Review Comment:
   I'm not sure how safe is it long-term to rely on `With*` functions modifying 
the state in-place. I'd do something like this instead:
   ```c++
   return ListRequest{}
       .WithBucket(params.bucket)
       .WithDelimiter(params.delimiter)
       .WithPrefix(params.prefix);
   ```



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public 
minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only 
list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& 
creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : 
Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& 
put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t 
flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t 
multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& 
get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const 
ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const 
GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const 
HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const 
ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& 
params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds 
multipart_upload_max_age_threshold);
+  void 
initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*>
 state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const 
{
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& 
kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& 
put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)

Review Comment:
   ```suggestion
       auto request = RequestType{}.WithBucket(put_object_params.bucket)
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -235,8 +344,8 @@ std::optional<std::vector<ListedObjectAttributes>> 
S3Wrapper::listBucket(const L
 
 std::optional<std::map<std::string, std::string>> 
S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
   Aws::S3::Model::GetObjectTaggingRequest request;
-  request.SetBucket(params.bucket);
-  request.SetKey(params.object_key);
+  request.WithBucket(params.bucket)
+    .WithKey(params.object_key);

Review Comment:
   In this case, I'd write it like that:
   ```c++
   auto request = 
Aws::S3::Model::GetObjectTaggingRequest{}.WithBucket(params.bucket).WithKey(params.object_key);
   ```
   Or to modify in-place, I'd add self-assignment, just in case it stops 
returning a modified reference to `*this`, and starts returning a new object:
   ```c++
   request = request.WithBucket(params.bucket);
   ```



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -200,13 +306,15 @@ void PutS3Object::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
+  ageOffMultipartUploads(*common_properties);
+
   auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, 
*common_properties);
   if (!put_s3_request_params) {
     session->transfer(flow_file, Failure);
     return;
   }
 
-  PutS3Object::ReadCallback callback(flow_file->getSize(), 
*put_s3_request_params, s3_wrapper_);
+  ReadCallback callback(flow_file->getSize(), *put_s3_request_params, 
s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);

Review Comment:
   You could also inject `this` to keep things simpler, or replace the whole 
callback class with an inline lambda with `[this, &flow_file, 
&put_s3_request_params]` capture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to