adamdebreceni commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1257924453


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,160 @@ std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const 
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& 
read_size_out) {
+  std::array<std::byte, BUFFER_SIZE> buffer{};
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, 
BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, 
next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), 
gsl::narrow<std::streamsize>(read_ret));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = 
createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, 
put_object_params.credentials, put_object_params.client_config, 
put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const auto div_ceil = [](size_t n, size_t d) {
+    if (n % d == 0)
+      return n / d;
+    else
+      return n / d + 1;
+  };
+  const size_t part_count = div_ceil(flow_size, upload_state.part_size);
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t part_number = start_part; part_number <= last_part; 
++part_number) {
+    uint64_t read_size{};
+    const auto remaining = flow_size - total_read;
+    const auto next_read_size = remaining < upload_state.part_size ? remaining 
: upload_state.part_size;

Review Comment:
   `std::min`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to