szaszm commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r590135395
##########
File path: extensions/aws/s3/S3Wrapper.cpp
##########
@@ -30,46 +37,291 @@ namespace minifi {
namespace aws {
namespace s3 {
-minifi::utils::optional<Aws::S3::Model::PutObjectResult>
S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest&
request) {
- Aws::S3::S3Client s3_client(credentials_, client_config_);
- auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+ absolute_path = key;
+ std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key,
true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() :
request_sender_(minifi::utils::make_unique<S3ClientRequestSender>()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr<S3RequestSender> request_sender) :
request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+ request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+ request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+ request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+ request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+ request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const
std::string& canned_acl) const {
+ if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) ==
CANNED_ACL_MAP.end())
+ return;
- if (outcome.IsSuccess()) {
- logger_->log_info("Added S3 object '%s' to bucket '%s'",
request.GetKey(), request.GetBucket());
- return outcome.GetResultWithOwnership();
- } else {
- logger_->log_error("PutS3Object failed with the following: '%s'",
outcome.GetError().GetMessage());
+ logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+ request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+ minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+ const auto match = expr.match(expiration);
+ const auto& results = expr.getResult();
+ if (!match || results.size() < 3)
+ return Expiration{};
+ return Expiration{results[1], results[2]};
+}
+
+std::string
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption)
{
+ if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+ return "";
+ }
+
+ auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(),
SERVER_SIDE_ENCRYPTION_MAP.end(),
+ [&](const std::pair<std::string, const
Aws::S3::Model::ServerSideEncryption>& pair) {
+ return pair.second == encryption;
+ });
+ if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+ return it->first;
+ }
+ return "";
+}
+
+minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const
PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream>
data_stream) {
+ Aws::S3::Model::PutObjectRequest request;
+ request.SetBucket(put_object_params.bucket);
+ request.SetKey(put_object_params.object_key);
+
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+ request.SetContentType(put_object_params.content_type);
+ request.SetMetadata(put_object_params.user_metadata_map);
+ request.SetBody(data_stream);
+ request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+ request.SetGrantRead(put_object_params.read_permission_user_list);
+ request.SetGrantReadACP(put_object_params.read_acl_user_list);
+ request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+ setCannedAcl(request, put_object_params.canned_acl);
+
+ auto aws_result = request_sender_->sendPutObjectRequest(request);
+ if (!aws_result) {
return minifi::utils::nullopt;
}
+
+ PutObjectResult result;
+ // Etags are returned by AWS in quoted form that should be removed
+ result.etag =
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+ result.version = aws_result->GetVersionId();
+
+ // GetExpiration returns a string pair with a date and a ruleid in
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+ // s3.expiration only needs the date member of this pair
+ result.expiration =
getExpiration(aws_result->GetExpiration()).expiration_time;
+ result.ssealgorithm =
getEncryptionString(aws_result->GetServerSideEncryption());
+ return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string&
object_key, const std::string& version) {
+ Aws::S3::Model::DeleteObjectRequest request;
+ request.SetBucket(bucket);
+ request.SetKey(object_key);
+ if (!version.empty()) {
+ request.SetVersionId(version);
+ }
+ return request_sender_->sendDeleteObjectRequest(request);
}
-bool S3Wrapper::sendDeleteObjectRequest(const
Aws::S3::Model::DeleteObjectRequest& request) {
- Aws::S3::S3Client s3_client(credentials_, client_config_);
- Aws::S3::Model::DeleteObjectOutcome outcome =
s3_client.DeleteObject(request);
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t
data_size, io::BaseStream& output) {
+ static const int64_t BUFFER_SIZE = 4096;
+ std::vector<uint8_t> buffer;
+ buffer.resize(BUFFER_SIZE);
- if (outcome.IsSuccess()) {
- logger_->log_info("Deleted S3 object '%s' from bucket '%s'",
request.GetKey(), request.GetBucket());
- return true;
- } else if (outcome.GetError().GetErrorType() ==
Aws::S3::S3Errors::NO_SUCH_KEY) {
- logger_->log_info("S3 object '%s' was not found in bucket '%s'",
request.GetKey(), request.GetBucket());
- return true;
- } else {
- logger_->log_error("DeleteS3Object failed with the following: '%s'",
outcome.GetError().GetMessage());
- return false;
+ int64_t write_size = 0;
+ while (write_size < data_size) {
+ auto next_write_size = (std::min)(data_size - write_size, BUFFER_SIZE);
+ if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size))
{
+ return -1;
+ }
+ auto ret = output.write(buffer.data(), next_write_size);
+ if (ret < 0) {
+ return ret;
+ }
+ write_size += next_write_size;
}
+ return write_size;
}
-minifi::utils::optional<Aws::S3::Model::GetObjectResult>
S3Wrapper::sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest&
request) {
- Aws::S3::S3Client s3_client(credentials_, client_config_);
- auto outcome = s3_client.GetObject(request);
+minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const
GetObjectRequestParameters& get_object_params, const
std::shared_ptr<io::BaseStream>& out_body) {
+ auto request =
createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
+ auto aws_result = request_sender_->sendGetObjectRequest(request);
+ if (!aws_result) {
+ return minifi::utils::nullopt;
+ }
+ auto result = fillFetchObjectResult<Aws::S3::Model::GetObjectResult,
GetObjectResult>(get_object_params, aws_result.value());
+ result.write_size = writeFetchedBody(aws_result->GetBody(),
aws_result->GetContentLength(), *out_body);
+ return result;
Review comment:
I'd convert this to take a reference, too.
```suggestion
minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const
GetObjectRequestParameters& get_object_params, io::BaseStream& out_body) {
auto request =
createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
auto aws_result = request_sender_->sendGetObjectRequest(request);
if (!aws_result) {
return minifi::utils::nullopt;
}
auto result = fillFetchObjectResult<Aws::S3::Model::GetObjectResult,
GetObjectResult>(get_object_params, *aws_result);
result.write_size = writeFetchedBody(aws_result->GetBody(),
aws_result->GetContentLength(), out_body);
return result;
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]