lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234035975


##########
docker/test/integration/cluster/checkers/AwsChecker.py:
##########
@@ -29,6 +29,16 @@ def check_s3_server_object_data(self, container_name, 
test_data):
         (code, file_data) = 
self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir 
+ "/binaryData"])
         return code == 0 and file_data == test_data
 
+    @retry_check()
+    def check_s3_server_object_hash(self, container_name: str, 
expected_file_hash: str):
+        (code, output) = 
self.container_communicator.execute_command(container_name, ["find", 
"/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, md5_output) = 
self.container_communicator.execute_command(container_name, ["md5sum", 
s3_mock_dir + "/binaryData"])
+        file_hash = md5_output.split(' ')[0].strip()
+        return code == 0 and file_hash == expected_file_hash

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
docker/test/integration/features/steps/steps.py:
##########
@@ -275,6 +275,11 @@ def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
 
+@given("a file with {size} content is present in \"{path}\"")

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& 
state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = 
minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / 
std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);

Review Comment:
   Good catch, it shouldn't be needed, I also corrected a mistake in the 
`state_directory.empty()` branch and added a test for it in 
b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& 
state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = 
minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / 
std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const 
std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = 
std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = 
minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> 
MultipartUploadStateStorage::getState(const std::string& bucket, const 
std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not 
found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), 
stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), 
state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), 
state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), 
state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), 
state.full_size);
+  state.uploaded_etags = 
minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + 
".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const 
std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", 
state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds 
multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - 
multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache 
key '%s'", property_key);
+      continue;
+    }

Review Comment:
   Good point, removed in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& 
state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = 
minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / 
std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const 
std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = 
std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = 
minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> 
MultipartUploadStateStorage::getState(const std::string& bucket, const 
std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not 
found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), 
stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), 
state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), 
state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), 
state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), 
state.full_size);
+  state.uploaded_etags = 
minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + 
".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const 
std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", 
state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds 
multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - 
multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache 
key '%s'", property_key);
+      continue;
+    }
+    int64_t stored_upload_time{};
+    if (!core::Property::StringToInt(value, stored_upload_time)) {
+      logger_->log_error("Multipart upload cache key '%s' has invalid value 
'%s'", property_key, value);
+      continue;
+    }
+    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+    if (upload_time < age_off_time) {
+      auto state_key_and_property_name = 
minifi::utils::StringUtils::split(property_key, ".");
+      if (state_key_and_property_name.size() < 2) {
+        logger_->log_error("Invalid property '%s'", property_key);
+        continue;
+      }

Review Comment:
   Removed in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const 
std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "wk" || unit == "wks" || unit == "week" || 
unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   Added in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const 
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& 
read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);

Review Comment:
   I see that in the project we use both implementations, but maybe it feels a 
bit better to use `std::array` instead. As Marton mentioned for our use cases 
there shouldn't be any significant differences between the two versions. 
Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const 
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& 
read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, 
BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, 
next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), 
gsl::narrow<std::streamsize>(next_read_size));

Review Comment:
   I think you are right, although the two should be the same, but it's safer 
to use the actual read bytes. Updated in 
b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const 
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& 
read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, 
BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, 
next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), 
gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = 
createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, 
put_object_params.credentials, put_object_params.client_config, 
put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? 
flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const 
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& 
read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, 
BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, 
next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), 
gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = 
createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, 
put_object_params.credentials, put_object_params.client_config, 
put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const 
PutObjectRequestParameters& put_object_params, const 
std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? 
flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {

Review Comment:
   Renamed in b45280b22c6f4e74f36cef4fd4b6305f177abe59



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to