lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234036346
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
return "";
}
-std::optional<PutObjectResult> S3Wrapper::putObject(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<Aws::IOStream>& data_stream) {
- Aws::S3::Model::PutObjectRequest request;
- request.SetBucket(put_object_params.bucket);
- request.SetKey(put_object_params.object_key);
-
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
- request.SetContentType(put_object_params.content_type);
- request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const
std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t&
read_size_out) {
+ std::vector<std::byte> buffer;
+ buffer.resize(BUFFER_SIZE);
+ auto data_stream = std::make_shared<Aws::StringStream>();
+ uint64_t read_size = 0;
+ while (read_size < read_limit) {
+ const auto next_read_size = (std::min)(read_limit - read_size,
BUFFER_SIZE);
+ const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0,
next_read_size));
+ if (io::isError(read_ret)) {
+ throw StreamReadException("Reading flow file inputstream failed!");
+ }
+ if (read_ret > 0) {
+ data_stream->write(reinterpret_cast<char*>(buffer.data()),
gsl::narrow<std::streamsize>(next_read_size));
+ read_size += read_ret;
+ } else {
+ break;
+ }
+ }
+ read_size_out = read_size;
+ return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+ uint64_t read_size{};
+ auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+ auto request =
createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
request.SetBody(data_stream);
- request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
- request.SetGrantRead(put_object_params.read_permission_user_list);
- request.SetGrantReadACP(put_object_params.read_acl_user_list);
- request.SetGrantWriteACP(put_object_params.write_acl_user_list);
- setCannedAcl(request, put_object_params.canned_acl);
auto aws_result = request_sender_->sendPutObjectRequest(request,
put_object_params.credentials, put_object_params.client_config,
put_object_params.use_virtual_addressing);
if (!aws_result) {
return std::nullopt;
}
- PutObjectResult result;
- // Etags are returned by AWS in quoted form that should be removed
- result.etag =
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
- result.version = aws_result->GetVersionId();
+ return createPutObjectResult(*aws_result);
+}
- // GetExpiration returns a string pair with a date and a ruleid in
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
- // s3.expiration only needs the date member of this pair
- result.expiration =
getExpiration(aws_result->GetExpiration()).expiration_time;
- result.ssealgorithm =
getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const
PutObjectRequestParameters& put_object_params, const
std::shared_ptr<io::InputStream>& stream,
+ MultipartUploadState upload_state) {
+ stream->seek(upload_state.uploaded_size);
+ S3Wrapper::UploadPartsResult result;
+ result.upload_id = upload_state.upload_id;
+ result.part_etags = upload_state.uploaded_etags;
+ const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+ const size_t part_count = flow_size % upload_state.part_size == 0 ?
flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+ size_t total_read = 0;
+ const size_t start_part = upload_state.uploaded_parts + 1;
+ const size_t last_part = start_part + part_count - 1;
+ for (size_t i = start_part; i <= last_part; ++i) {
+ uint64_t read_size{};
+ const auto remaining = flow_size - total_read;
+ const auto next_read_size = remaining < upload_state.part_size ? remaining
: upload_state.part_size;
+ auto stream_ptr = readFlowFileStream(stream, next_read_size, read_size);
+ total_read += read_size;
+
+ Aws::S3::Model::UploadPartRequest upload_part_request;
+ upload_part_request.WithBucket(put_object_params.bucket)
+ .WithKey(put_object_params.object_key)
+ .WithPartNumber(i)
+ .WithUploadId(upload_state.upload_id);
+ upload_part_request.SetBody(stream_ptr);
+
+ Aws::Utils::ByteBuffer
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*stream_ptr));
+
upload_part_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+ auto upload_part_result =
request_sender_->sendUploadPartRequest(upload_part_request,
put_object_params.credentials, put_object_params.client_config,
put_object_params.use_virtual_addressing);
+ if (!upload_part_result) {
+ logger_->log_error("Failed to upload part %d of %d of S3 object with key
'%s'", i, last_part, put_object_params.object_key);
+ return std::nullopt;
+ }
+ result.part_etags.push_back(upload_part_result->GetETag());
+ upload_state.uploaded_etags.push_back(upload_part_result->GetETag());
+ upload_state.uploaded_parts += 1;
+ upload_state.uploaded_size += read_size;
+ multipart_upload_storage_->storeState(put_object_params.bucket,
put_object_params.object_key, upload_state);
+ logger_->log_info("Uploaded part %d of %d S3 object with key '%s'", i,
last_part, put_object_params.object_key);
+ }
+
+ multipart_upload_storage_->removeState(put_object_params.bucket,
put_object_params.object_key);
return result;
}
+std::optional<Aws::S3::Model::CompleteMultipartUploadResult>
S3Wrapper::completeMultipartUpload(const PutObjectRequestParameters&
put_object_params,
+ const S3Wrapper::UploadPartsResult& upload_parts_result) {
+ Aws::S3::Model::CompleteMultipartUploadRequest
complete_multipart_upload_request;
+ complete_multipart_upload_request.WithBucket(put_object_params.bucket)
+ .WithKey(put_object_params.object_key)
+ .WithUploadId(upload_parts_result.upload_id);
+
+ Aws::S3::Model::CompletedMultipartUpload completed_multipart_upload;
+ for (size_t i = 0; i < upload_parts_result.part_etags.size(); ++i) {
+ Aws::S3::Model::CompletedPart part;
+ part.WithETag(upload_parts_result.part_etags[i])
+ .WithPartNumber(i + 1);
+ completed_multipart_upload.AddParts(part);
+ }
+
+
complete_multipart_upload_request.SetMultipartUpload(completed_multipart_upload);
+
+ return
request_sender_->sendCompleteMultipartUploadRequest(complete_multipart_upload_request,
put_object_params.credentials,
+ put_object_params.client_config, put_object_params.use_virtual_addressing);
+}
+
+bool S3Wrapper::multipartUploadExistsInS3(const PutObjectRequestParameters&
put_object_params) {
+ ListMultipartUploadsRequestParameters params(put_object_params.credentials,
put_object_params.client_config);
+ params.bucket = put_object_params.bucket;
+ auto pending_uploads = listMultipartUploads(params);
+ if (!pending_uploads) {
+ return false;
+ }
+
+ return ranges::find_if(*pending_uploads, [&](const auto& upload) { return
upload.key == put_object_params.object_key; }) != pending_uploads->end();
Review Comment:
Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const
GetObjectRequestParamet
return result;
}
+void S3Wrapper::addListMultipartUploadResults(const
Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads,
std::optional<std::chrono::milliseconds> max_upload_age,
+ std::vector<MultipartUpload>& filtered_uploads) {
+ const auto now = Aws::Utils::DateTime::Now();
+ for (const auto& upload : uploads) {
+ if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+ logger_->log_debug("Multipart upload with key '%s' and upload id '%s'
did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+ continue;
+ }
Review Comment:
No, we actually want to skip those listed uploads that have not reached the
age limit yet, because we only want to list the uploads that have already
passed the max upload age, to delete the old pending uploads.
##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const
GetObjectRequestParamet
return result;
}
+void S3Wrapper::addListMultipartUploadResults(const
Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads,
std::optional<std::chrono::milliseconds> max_upload_age,
+ std::vector<MultipartUpload>& filtered_uploads) {
+ const auto now = Aws::Utils::DateTime::Now();
+ for (const auto& upload : uploads) {
+ if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+ logger_->log_debug("Multipart upload with key '%s' and upload id '%s'
did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+ continue;
+ }
+
+ MultipartUpload filtered_upload;
+ filtered_upload.key = upload.GetKey();
+ filtered_upload.upload_id = upload.GetUploadId();
+ filtered_uploads.push_back(filtered_upload);
+ }
+}
+
+std::optional<std::vector<MultipartUpload>>
S3Wrapper::listMultipartUploads(const ListMultipartUploadsRequestParameters&
params) {
+ std::vector<MultipartUpload> result;
+ std::optional<Aws::S3::Model::ListMultipartUploadsResult> aws_result;
+ Aws::S3::Model::ListMultipartUploadsRequest request;
+ request.SetBucket(params.bucket);
+ do {
+ aws_result = request_sender_->sendListMultipartUploadsRequest(request,
params.credentials, params.client_config, params.use_virtual_addressing);
+ if (!aws_result) {
+ return std::nullopt;
+ }
+ const auto& uploads = aws_result->GetUploads();
+ logger_->log_debug("AWS S3 List operation returned %zu multipart uploads.
This result is%s truncated.", uploads.size(), aws_result->GetIsTruncated() ? ""
: " not");
+ addListMultipartUploadResults(uploads, params.upload_max_age, result);
+ if (aws_result->GetIsTruncated()) {
+ request.SetKeyMarker(aws_result->GetNextKeyMarker());
+ }
+ } while (aws_result->GetIsTruncated());
+
+ return result;
+}
+
+bool S3Wrapper::abortMultipartUpload(const
AbortMultipartUploadRequestParameters& params) {
+ Aws::S3::Model::AbortMultipartUploadRequest request;
+ request.WithBucket(params.bucket)
+ .WithKey(params.key)
+ .WithUploadId(params.upload_id);
+ return request_sender_->sendAbortMultipartUploadRequest(request,
params.credentials, params.client_config, params.use_virtual_addressing);
+}
+
+void S3Wrapper::initailizeMultipartUploadStateStorage(const std::string&
multipart_temp_dir, const std::string& state_id) {
Review Comment:
Good catch, updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59
##########
extensions/aws/tests/PutS3ObjectTests.cpp:
##########
@@ -71,7 +87,7 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS
credential setting", "[awsCr
setCredentialsService();
}
- test_controller.runSession(plan, true);
+ test_controller.runSession(plan);
Review Comment:
I checked and in the clang job `extensions/aws/tests/PutS3ObjectTests.cpp`
file was listed as one of the parameters of clang-tidy so it should have run on
that file. I'm not sure why it hasn't complained about this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]