lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258086012
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,160 @@ std::string
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
return "";
}
-std::optional<PutObjectResult> S3Wrapper::putObject(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<Aws::IOStream>& data_stream) {
- Aws::S3::Model::PutObjectRequest request;
- request.SetBucket(put_object_params.bucket);
- request.SetKey(put_object_params.object_key);
-
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
- request.SetContentType(put_object_params.content_type);
- request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t&
read_size_out) {
+ std::array<std::byte, BUFFER_SIZE> buffer{};
+ auto data_stream = std::make_shared<Aws::StringStream>();
+ uint64_t read_size = 0;
+ while (read_size < read_limit) {
+ const auto next_read_size = (std::min)(read_limit - read_size,
BUFFER_SIZE);
+ const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0,
next_read_size));
+ if (io::isError(read_ret)) {
+ throw StreamReadException("Reading flow file inputstream failed!");
+ }
+ if (read_ret > 0) {
+ data_stream->write(reinterpret_cast<char*>(buffer.data()),
gsl::narrow<std::streamsize>(read_ret));
+ read_size += read_ret;
+ } else {
+ break;
+ }
+ }
+ read_size_out = read_size;
+ return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+ uint64_t read_size{};
+ auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+ auto request =
createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
request.SetBody(data_stream);
- request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
- request.SetGrantRead(put_object_params.read_permission_user_list);
- request.SetGrantReadACP(put_object_params.read_acl_user_list);
- request.SetGrantWriteACP(put_object_params.write_acl_user_list);
- setCannedAcl(request, put_object_params.canned_acl);
auto aws_result = request_sender_->sendPutObjectRequest(request,
put_object_params.credentials, put_object_params.client_config,
put_object_params.use_virtual_addressing);
if (!aws_result) {
return std::nullopt;
}
- PutObjectResult result;
- // Etags are returned by AWS in quoted form that should be removed
- result.etag =
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
- result.version = aws_result->GetVersionId();
+ return createPutObjectResult(*aws_result);
+}
- // GetExpiration returns a string pair with a date and a ruleid in
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
- // s3.expiration only needs the date member of this pair
- result.expiration =
getExpiration(aws_result->GetExpiration()).expiration_time;
- result.ssealgorithm =
getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream,
+ MultipartUploadState upload_state) {
+ stream->seek(upload_state.uploaded_size);
+ S3Wrapper::UploadPartsResult result;
+ result.upload_id = upload_state.upload_id;
+ result.part_etags = upload_state.uploaded_etags;
+ const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+ const auto div_ceil = [](size_t n, size_t d) {
+ if (n % d == 0)
+ return n / d;
+ else
+ return n / d + 1;
+ };
+ const size_t part_count = div_ceil(flow_size, upload_state.part_size);
+ size_t total_read = 0;
+ const size_t start_part = upload_state.uploaded_parts + 1;
+ const size_t last_part = start_part + part_count - 1;
+ for (size_t part_number = start_part; part_number <= last_part;
++part_number) {
+ uint64_t read_size{};
+ const auto remaining = flow_size - total_read;
+ const auto next_read_size = remaining < upload_state.part_size ? remaining
: upload_state.part_size;
Review Comment:
Updated in 7f27cd5bfdf869971ebab9f706b82405c0e16913
##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const
std::string& key, const MultipartUploadState& state) {
+ std::unordered_map<std::string, std::string> stored_state;
+ state_manager_->get(stored_state);
+ std::string state_key = bucket + "/" + key;
+ stored_state[state_key + ".upload_id"] = state.upload_id;
+ stored_state[state_key + ".upload_time"] =
std::to_string(state.upload_time.Millis());
+ stored_state[state_key + ".uploaded_parts"] =
std::to_string(state.uploaded_parts);
+ stored_state[state_key + ".uploaded_size"] =
std::to_string(state.uploaded_size);
+ stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+ stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+ stored_state[state_key + ".uploaded_etags"] =
minifi::utils::StringUtils::join(";", state.uploaded_etags);
+ state_manager_->set(stored_state);
+ state_manager_->commit();
+ state_manager_->persist();
+}
+
+std::optional<MultipartUploadState>
MultipartUploadStateStorage::getState(const std::string& bucket, const
std::string& key) const {
+ std::unordered_map<std::string, std::string> state_map;
+ if (!state_manager_->get(state_map)) {
+ logger_->log_warn("No previous multipart upload state was associated with
this processor.");
+ return std::nullopt;
+ }
+ std::string state_key = bucket + "/" + key;
+ if (!state_map.contains(state_key + ".upload_id")) {
+ logger_->log_warn("Multipart upload state was not found for key '%s'",
state_key);
+ return std::nullopt;
+ }
+
+ MultipartUploadState state;
+ state.upload_id = state_map[state_key + ".upload_id"];
+
+ int64_t stored_upload_time = 0;
+ core::Property::StringToInt(state_map[state_key + ".upload_time"],
stored_upload_time);
+ state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+ core::Property::StringToInt(state_map[state_key + ".uploaded_parts"],
state.uploaded_parts);
+ core::Property::StringToInt(state_map[state_key + ".uploaded_size"],
state.uploaded_size);
+ core::Property::StringToInt(state_map[state_key + ".part_size"],
state.part_size);
+ core::Property::StringToInt(state_map[state_key + ".full_size"],
state.full_size);
+ state.uploaded_etags =
minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_map[state_key +
".uploaded_etags"], ";");
+ return state;
+}
+
+void MultipartUploadStateStorage::removeKey(const std::string& state_key,
std::unordered_map<std::string, std::string>& state_map) {
+ state_map.erase(state_key + ".upload_id");
+ state_map.erase(state_key + ".upload_time");
+ state_map.erase(state_key + ".uploaded_parts");
+ state_map.erase(state_key + ".uploaded_size");
+ state_map.erase(state_key + ".part_size");
+ state_map.erase(state_key + ".full_size");
+ state_map.erase(state_key + ".uploaded_etags");
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const
std::string& key) {
+ std::unordered_map<std::string, std::string> state_map;
+ if (!state_manager_->get(state_map)) {
+ logger_->log_warn("No previous multipart upload state was associated with
this processor.");
+ return;
+ }
+ std::string state_key = bucket + "/" + key;
+ if (!state_map.contains(state_key + ".upload_id")) {
+ logger_->log_warn("Multipart upload state was not found for key '%s'",
state_key);
+ return;
+ }
+
+ removeKey(state_key, state_map);
+ state_manager_->set(state_map);
+ state_manager_->commit();
+ state_manager_->persist();
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds
multipart_upload_max_age_threshold) {
+ std::unordered_map<std::string, std::string> state_map;
+ if (!state_manager_->get(state_map)) {
+ logger_->log_warn("No previous multipart upload state was associated with
this processor.");
+ return;
+ }
+ auto age_off_time = Aws::Utils::DateTime::Now() -
multipart_upload_max_age_threshold;
+
+ std::vector<std::string> keys_to_remove;
+ for (const auto& [property_key, value] : state_map) {
+ if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+ continue;
+ }
+ int64_t stored_upload_time{};
+ if (!core::Property::StringToInt(value, stored_upload_time)) {
+ logger_->log_error("Multipart upload cache key '%s' has invalid value
'%s'", property_key, value);
+ continue;
+ }
+ auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+ if (upload_time < age_off_time) {
+ auto state_key_and_property_name =
minifi::utils::StringUtils::split(property_key, ".");
+ keys_to_remove.push_back(state_key_and_property_name[0]);
Review Comment:
Good point, updated in 7f27cd5bfdf869971ebab9f706b82405c0e16913
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]