arpadboda commented on a change in pull request #915:
URL: https://github.com/apache/nifi-minifi-cpp/pull/915#discussion_r512621151
##########
File path: cmake/BundledAwsSdkCpp.cmake
##########
@@ -68,40 +77,40 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
ExternalProject_Add(
aws-c-common-external
GIT_REPOSITORY "https://github.com/awslabs/aws-c-common.git"
- GIT_TAG "ac02e1728d740bb9106b6ea727cd3378f8ea438a"
+ GIT_TAG "d8f6f067975cd3670c62cca0455b9d381db19756"
SOURCE_DIR "${BINARY_DIR}/thirdparty/aws-c-common-src"
INSTALL_DIR "${BINARY_DIR}/thirdparty/libaws-install"
LIST_SEPARATOR % # This is needed for passing semicolon-separated
lists
CMAKE_ARGS ${AWS_C_COMMON_CMAKE_ARGS}
- BUILD_BYPRODUCTS
"${BINARY_DIR}/thirdparty/libaws-install/lib/libaws-c-common.${SUFFIX}"
+ BUILD_BYPRODUCTS
"${BINARY_DIR}/thirdparty/libaws-install/${CMAKE_INSTALL_LIBDIR}/${PREFIX}aws-c-common.${SUFFIX}"
EXCLUDE_FROM_ALL TRUE
)
ExternalProject_Add(
aws-checksum-external
GIT_REPOSITORY "https://github.com/awslabs/aws-checksums.git"
- GIT_TAG "41dc36d14b0898bd34e3f91c808fcb00f5e21875"
+ GIT_TAG "8e1a84c2924774db1b9d945c556343b217d71d05"
SOURCE_DIR "${BINARY_DIR}/thirdparty/aws-checksums-src"
INSTALL_DIR "${BINARY_DIR}/thirdparty/libaws-install"
LIST_SEPARATOR % # This is needed for passing semicolon-separated
lists
CMAKE_ARGS ${AWS_CHECKSUM_CMAKE_ARGS}
- BUILD_BYPRODUCTS
"${BINARY_DIR}/thirdparty/libaws-install/lib/libaws-checksums.${SUFFIX}"
+ BUILD_BYPRODUCTS
"${BINARY_DIR}/thirdparty/libaws-install/${CMAKE_INSTALL_LIBDIR}/${PREFIX}aws-checksums.${SUFFIX}"
EXCLUDE_FROM_ALL TRUE
)
ExternalProject_Add(
aws-c-event-stream-external
GIT_REPOSITORY "https://github.com/awslabs/aws-c-event-stream.git"
- GIT_TAG "97ab2e57e83ad114679dbee0dcfb5048640debe7"
+ GIT_TAG "3462b68d563d8f9b3a26517b833671a24ab81cc5"
SOURCE_DIR "${BINARY_DIR}/thirdparty/aws-c-event-stream-src"
INSTALL_DIR "${BINARY_DIR}/thirdparty/libaws-install"
LIST_SEPARATOR % # This is needed for passing semicolon-separated
lists
CMAKE_ARGS ${AWS_C_EVENT_STREAM_CMAKE_ARGS}
- BUILD_BYPRODUCTS
"${BINARY_DIR}/thirdparty/libaws-install/lib/libaws-c-event-stream.${SUFFIX}"
+ BUILD_BYPRODUCTS
"${BINARY_DIR}/thirdparty/libaws-install/${CMAKE_INSTALL_LIBDIR}/${PREFIX}aws-c-event-stream.${SUFFIX}"
EXCLUDE_FROM_ALL TRUE
)
ExternalProject_Add(
aws-sdk-cpp-external
GIT_REPOSITORY "https://github.com/aws/aws-sdk-cpp.git"
- GIT_TAG "1.7.109"
+ GIT_TAG "1.8.52"
Review comment:
👍
##########
File path: extensions/aws/processors/PutS3Object.h
##########
@@ -0,0 +1,197 @@
+/**
+ * @file PutS3Object.h
+ * PutS3Object class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <sstream>
+#include <utility>
+#include <vector>
+#include <memory>
+#include <string>
+#include <set>
+
+#include "aws/core/auth/AWSCredentialsProvider.h"
+
+#include "S3Wrapper.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+namespace region {
+
+constexpr const char *AF_SOUTH_1 = "af-south-1";
+constexpr const char *AP_EAST_1 = "ap-east-1";
+constexpr const char *AP_NORTHEAST_1 = "ap-northeast-1";
+constexpr const char *AP_NORTHEAST_2 = "ap-northeast-2";
+constexpr const char *AP_NORTHEAST_3 = "ap-northeast-3";
+constexpr const char *AP_SOUTH_1 = "ap-south-1";
+constexpr const char *AP_SOUTHEAST_1 = "ap-southeast-1";
+constexpr const char *AP_SOUTHEAST_2 = "ap-southeast-2";
+constexpr const char *CA_CENTRAL_1 = "ca-central-1";
+constexpr const char *CN_NORTH_1 = "cn-north-1";
+constexpr const char *CN_NORTHWEST_1 = "cn-northwest-1";
+constexpr const char *EU_CENTRAL_1 = "eu-central-1";
+constexpr const char *EU_NORTH_1 = "eu-north-1";
+constexpr const char *EU_SOUTH_1 = "eu-south-1";
+constexpr const char *EU_WEST_1 = "eu-west-1";
+constexpr const char *EU_WEST_2 = "eu-west-2";
+constexpr const char *EU_WEST_3 = "eu-west-3";
+constexpr const char *ME_SOUTH_1 = "me-south-1";
+constexpr const char *SA_EAST_1 = "sa-east-1";
+constexpr const char *US_EAST_1 = "us-east-1";
+constexpr const char *US_EAST_2 = "us-east-2";
+constexpr const char *US_GOV_EAST_1 = "us-gov-east-1";
+constexpr const char *US_GOV_WEST_1 = "us-gov-west-1";
+constexpr const char *US_WEST_1 = "us-west-1";
+constexpr const char *US_WEST_2 = "us-west-2";
+
+} // namespace region
+
+class PutS3Object : public core::Processor {
+ public:
+ static constexpr char const* ProcessorName = "PutS3Object";
+
+ static const std::set<std::string> CANNED_ACLS;
+ static const std::set<std::string> REGIONS;
+ static const std::set<std::string> STORAGE_CLASSES;
+ static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
+
+ // Supported Properties
+ static const core::Property ObjectKey;
+ static const core::Property Bucket;
+ static const core::Property ContentType;
+ static const core::Property AccessKey;
+ static const core::Property SecretKey;
+ static const core::Property CredentialsFile;
+ static const core::Property AWSCredentialsProviderService;
+ static const core::Property StorageClass;
+ static const core::Property ServerSideEncryption;
+ static const core::Property Region;
+ static const core::Property CommunicationsTimeout;
+ static const core::Property FullControlUserList;
+ static const core::Property ReadPermissionUserList;
+ static const core::Property ReadACLUserList;
+ static const core::Property WriteACLUserList;
+ static const core::Property CannedACL;
+ static const core::Property EndpointOverrideURL;
+ static const core::Property ProxyHost;
+ static const core::Property ProxyPort;
+ static const core::Property ProxyUsername;
+ static const core::Property ProxyPassword;
+
+ // Supported Relationships
+ static const core::Relationship Failure;
+ static const core::Relationship Success;
+
+ explicit PutS3Object(std::string name, minifi::utils::Identifier uuid =
minifi::utils::Identifier())
+ : PutS3Object(name, uuid,
minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
+ }
+
+ explicit PutS3Object(std::string name, minifi::utils::Identifier uuid,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+ : core::Processor(std::move(name), uuid)
+ , s3_wrapper_(std::move(s3_wrapper)) {
+ }
+
+ ~PutS3Object() override = default;
+
+ bool supportsDynamicProperties() override { return true; }
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
+
+ class ReadCallback : public InputStreamCallback {
+ public:
+ static const uint64_t MAX_SIZE = 5UL * 1024UL * 1024UL * 1024UL; // 5GB
limit on AWS
+ static const uint64_t BUFFER_SIZE = 4096;
+
+ ReadCallback(uint64_t flow_size, const
minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3WrapperBase*
s3_wrapper)
+ : flow_size_(flow_size)
+ , options_(options)
+ , s3_wrapper_(s3_wrapper) {
+ }
+
+ int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+ if (flow_size_ > MAX_SIZE) {
+ return -1;
+ }
+ std::vector<uint8_t> buffer;
+ auto data_stream = std::make_shared<std::stringstream>();
+ buffer.reserve(BUFFER_SIZE);
+ read_size_ = 0;
+ while (read_size_ < flow_size_) {
+ auto next_read_size = flow_size_ - read_size_ < BUFFER_SIZE ?
flow_size_ - read_size_ : BUFFER_SIZE;
Review comment:
The right side looks like an std::min to me :)
##########
File path: extensions/aws/processors/PutS3Object.cpp
##########
@@ -0,0 +1,495 @@
+/**
+ * @file PutS3Object.cpp
+ * PutS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutS3Object.h"
+
+#include <string>
+#include <set>
+#include <memory>
+#include <map>
+
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+#include "utils/MapUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::set<std::string>
PutS3Object::CANNED_ACLS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::CANNED_ACL_MAP));
+const std::set<std::string> PutS3Object::REGIONS({region::AF_SOUTH_1,
region::AP_EAST_1, region::AP_NORTHEAST_1,
+ region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1,
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
+ region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1,
region::EU_CENTRAL_1, region::EU_NORTH_1,
+ region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3,
region::ME_SOUTH_1, region::SA_EAST_1,
+ region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1,
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
+const std::set<std::string>
PutS3Object::STORAGE_CLASSES(minifi::utils::MapUtils::getKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
+const std::set<std::string>
PutS3Object::SERVER_SIDE_ENCRYPTIONS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
+
+const core::Property PutS3Object::ObjectKey(
+ core::PropertyBuilder::createProperty("Object Key")
+ ->withDescription("The key of the S3 object. If none is given the filename
attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("The S3 bucket")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ContentType(
+ core::PropertyBuilder::createProperty("Content Type")
+ ->withDescription("Sets the Content-Type HTTP header indicating the type
of content stored in "
+ "the associated object. The value of this header is a
standard MIME type. "
+ "If no content type is provided the default content type
"
+ "\"application/octet-stream\" will be used.")
+ ->supportsExpressionLanguage(true)
+ ->withDefaultValue<std::string>("application/octet-stream")
+ ->build());
+const core::Property PutS3Object::AccessKey(
+ core::PropertyBuilder::createProperty("Access Key")
+ ->withDescription("AWS account access key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::SecretKey(
+ core::PropertyBuilder::createProperty("Secret Key")
+ ->withDescription("AWS account secret key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::CredentialsFile(
+ core::PropertyBuilder::createProperty("Credentials File")
+ ->withDescription("Path to a file containing AWS access key and secret key
in properties file format. Properties used: accessKey and secretKey")
+ ->build());
+const core::Property PutS3Object::AWSCredentialsProviderService(
+ core::PropertyBuilder::createProperty("AWS Credentials Provider service")
+ ->withDescription("The name of the AWS Credentials Provider controller
service that is used to obtain AWS credentials.")
+ ->build());
+const core::Property PutS3Object::StorageClass(
+ core::PropertyBuilder::createProperty("Storage Class")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>("Standard")
+ ->withAllowableValues<std::string>(PutS3Object::STORAGE_CLASSES)
+ ->withDescription("AWS S3 Storage Class")
+ ->build());
+const core::Property PutS3Object::Region(
+ core::PropertyBuilder::createProperty("Region")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(region::US_WEST_2)
+ ->withAllowableValues<std::string>(PutS3Object::REGIONS)
+ ->withDescription("AWS Region")
+ ->build());
+const core::Property PutS3Object::CommunicationsTimeout(
+ core::PropertyBuilder::createProperty("Communications Timeout")
+ ->isRequired(true)
+ ->withDefaultValue<core::TimePeriodValue>("30 sec")
+ ->withDescription("")
+ ->build());
+const core::Property PutS3Object::FullControlUserList(
+ core::PropertyBuilder::createProperty("FullControl User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have Full Control for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ReadPermissionUserList(
+ core::PropertyBuilder::createProperty("Read Permission User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have Read Access for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ReadACLUserList(
+ core::PropertyBuilder::createProperty("Read ACL User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have permissions to read "
+ "the Access Control List for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::WriteACLUserList(
+ core::PropertyBuilder::createProperty("Write ACL User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have permissions to change "
+ "the Access Control List for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::CannedACL(
+ core::PropertyBuilder::createProperty("Canned ACL")
+ ->withDescription("Amazon Canned ACL for an object. Allowed values:
BucketOwnerFullControl, BucketOwnerRead, AuthenticatedRead, "
+ "PublicReadWrite, PublicRead, Private, AwsExecRead; will
be ignored if any other ACL/permission property is specified.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::EndpointOverrideURL(
+ core::PropertyBuilder::createProperty("Endpoint Override URL")
+ ->withDescription("Endpoint URL to use instead of the AWS default
including scheme, host, "
+ "port, and path. The AWS libraries select an endpoint
URL based on the AWS "
+ "region, but this property overrides the selected
endpoint URL, allowing use "
+ "with other S3-compatible endpoints.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ServerSideEncryption(
+ core::PropertyBuilder::createProperty("Server Side Encryption")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>("None")
+ ->withAllowableValues<std::string>(PutS3Object::SERVER_SIDE_ENCRYPTIONS)
+ ->withDescription("Specifies the algorithm used for server side
encryption.")
+ ->build());
+const core::Property PutS3Object::ProxyHost(
+ core::PropertyBuilder::createProperty("Proxy Host")
+ ->withDescription("Proxy host name or IP")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ProxyPort(
+ core::PropertyBuilder::createProperty("Proxy Port")
+ ->withDescription("The port number of the proxy host")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ProxyUsername(
+ core::PropertyBuilder::createProperty("Proxy Username")
+ ->withDescription("Username to set when authenticating against proxy")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ProxyPassword(
+ core::PropertyBuilder::createProperty("Proxy Password")
+ ->withDescription("Password to set when authenticating against proxy")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship PutS3Object::Success("success", "FlowFiles are routed
to success relationship");
+const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed
to failure relationship");
+
+void PutS3Object::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(ObjectKey);
+ properties.insert(Bucket);
+ properties.insert(ContentType);
+ properties.insert(AccessKey);
+ properties.insert(SecretKey);
+ properties.insert(CredentialsFile);
+ properties.insert(AWSCredentialsProviderService);
+ properties.insert(StorageClass);
+ properties.insert(Region);
+ properties.insert(CommunicationsTimeout);
+ properties.insert(FullControlUserList);
+ properties.insert(ReadPermissionUserList);
+ properties.insert(ReadACLUserList);
+ properties.insert(WriteACLUserList);
+ properties.insert(CannedACL);
+ properties.insert(EndpointOverrideURL);
+ properties.insert(ServerSideEncryption);
+ properties.insert(ProxyHost);
+ properties.insert(ProxyPort);
+ properties.insert(ProxyUsername);
+ properties.insert(ProxyPassword);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Failure);
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const {
+ std::string service_name;
+ if (context->getProperty(AWSCredentialsProviderService.getName(),
service_name) && !service_name.empty()) {
+ std::shared_ptr<core::controller::ControllerService> service =
context->getControllerService(service_name);
+ if (nullptr != service) {
+ auto aws_credentials_service =
std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
+ if (aws_credentials_service) {
+ return
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
+ }
+ }
+ }
+ return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromProperties(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) const {
+ std::string access_key;
+ context->getProperty(AccessKey, access_key, flow_file);
+ std::string secret_key;
+ context->getProperty(SecretKey, secret_key, flow_file);
+ if (!access_key.empty() && !secret_key.empty()) {
+ Aws::Auth::AWSCredentials creds(access_key, secret_key);
+ return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+ }
+ return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromFile(const
std::shared_ptr<core::ProcessContext> &context) const {
+ std::string credential_file;
+ if (context->getProperty(CredentialsFile.getName(), credential_file) &&
!credential_file.empty()) {
+ auto properties = std::make_shared<minifi::Properties>();
+ properties->loadConfigureFile(credential_file.c_str());
+ std::string access_key;
+ std::string secret_key;
+ if (properties->getString("accessKey", access_key) && !access_key.empty()
&& properties->getString("secretKey", secret_key) && !secret_key.empty()) {
+ Aws::Auth::AWSCredentials creds(access_key, secret_key);
+ return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+ }
+ }
+ return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentials(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) const {
+ auto prop_cred = getAWSCredentialsFromProperties(context, flow_file);
+ if (prop_cred) {
+ logger_->log_info("AWS Credentials successfully set from properties");
+ return prop_cred.value();
+ }
+
+ auto file_cred = getAWSCredentialsFromFile(context);
+ if (file_cred) {
+ logger_->log_info("AWS Credentials successfully set from file");
+ return file_cred.value();
+ }
+
+ auto service_cred = getAWSCredentialsFromControllerService(context);
+ if (service_cred) {
+ logger_->log_info("AWS Credentials successfully set from controller
service");
+ return service_cred.value();
+ }
+
+ return minifi::utils::nullopt;
+}
+
+void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext>
&context) {
+ const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+ bool first_property = true;
+ for (const auto &prop_key : dynamic_prop_keys) {
+ std::string prop_value = "";
+ if (context->getDynamicProperty(prop_key, prop_value) &&
!prop_value.empty()) {
+ logger_->log_debug("PutS3Object: DynamicProperty: [%s] -> [%s]",
prop_key, prop_value);
+ put_s3_request_params_.user_metadata_map.emplace(prop_key, prop_value);
+ if (first_property) {
+ user_metadata_ = prop_key + "=" + prop_value;
+ first_property = false;
+ } else {
+ user_metadata_ += "," + prop_key + "=" + prop_value;
+ }
+ }
+ }
+ logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
+}
+
+bool PutS3Object::setProxy(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
+ aws::s3::ProxyOptions proxy;
+ context->getProperty(ProxyHost, proxy.host, flow_file);
+ std::string port_str;
+ if (context->getProperty(ProxyPort, port_str, flow_file) &&
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
+ logger_->log_error("Proxy port invalid");
+ return false;
+ }
+ context->getProperty(ProxyUsername, proxy.username, flow_file);
+ context->getProperty(ProxyPassword, proxy.password, flow_file);
+ if (!proxy.host.empty()) {
+ s3_wrapper_->setProxy(proxy);
+ logger_->log_info("Proxy for PutS3Object was set.");
+ }
+ return true;
+}
+
+void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ if (!context->getProperty(Bucket.getName(), put_s3_request_params_.bucket)
|| put_s3_request_params_.bucket.empty()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or
invalid");
+ }
+ logger_->log_debug("PutS3Object: Bucket [%s]",
put_s3_request_params_.bucket);
+
+ if (!context->getProperty(StorageClass.getName(),
put_s3_request_params_.storage_class)
+ || put_s3_request_params_.storage_class.empty()
+ || STORAGE_CLASSES.find(put_s3_request_params_.storage_class) ==
STORAGE_CLASSES.end()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Class property
missing or invalid");
+ }
+ logger_->log_debug("PutS3Object: Storage Class [%s]",
put_s3_request_params_.storage_class);
+
+ std::string value;
+ if (!context->getProperty(Region.getName(), value) || value.empty() ||
REGIONS.find(value) == REGIONS.end()) {
Review comment:
Just fancy: in case you use set, you can rely on count instead of
comparing the result of find to end.
##########
File path: extensions/aws/processors/PutS3Object.cpp
##########
@@ -0,0 +1,495 @@
+/**
+ * @file PutS3Object.cpp
+ * PutS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutS3Object.h"
+
+#include <string>
+#include <set>
+#include <memory>
+#include <map>
+
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+#include "utils/MapUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::set<std::string>
PutS3Object::CANNED_ACLS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::CANNED_ACL_MAP));
+const std::set<std::string> PutS3Object::REGIONS({region::AF_SOUTH_1,
region::AP_EAST_1, region::AP_NORTHEAST_1,
+ region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1,
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
+ region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1,
region::EU_CENTRAL_1, region::EU_NORTH_1,
+ region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3,
region::ME_SOUTH_1, region::SA_EAST_1,
+ region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1,
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
+const std::set<std::string>
PutS3Object::STORAGE_CLASSES(minifi::utils::MapUtils::getKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
+const std::set<std::string>
PutS3Object::SERVER_SIDE_ENCRYPTIONS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
+
+const core::Property PutS3Object::ObjectKey(
+ core::PropertyBuilder::createProperty("Object Key")
+ ->withDescription("The key of the S3 object. If none is given the filename
attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("The S3 bucket")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ContentType(
+ core::PropertyBuilder::createProperty("Content Type")
+ ->withDescription("Sets the Content-Type HTTP header indicating the type
of content stored in "
+ "the associated object. The value of this header is a
standard MIME type. "
+ "If no content type is provided the default content type
"
+ "\"application/octet-stream\" will be used.")
+ ->supportsExpressionLanguage(true)
+ ->withDefaultValue<std::string>("application/octet-stream")
+ ->build());
+const core::Property PutS3Object::AccessKey(
+ core::PropertyBuilder::createProperty("Access Key")
+ ->withDescription("AWS account access key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::SecretKey(
Review comment:
Why do keys need to support EL?
I don't think these could anyhow depend on flowfile or whatever expression,
I expect them to be purely static data.
I think the same applies for most of the properties that are validated once
(in onschedule)
##########
File path: extensions/aws/processors/PutS3Object.cpp
##########
@@ -0,0 +1,495 @@
+/**
+ * @file PutS3Object.cpp
+ * PutS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutS3Object.h"
+
+#include <string>
+#include <set>
+#include <memory>
+#include <map>
+
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+#include "utils/MapUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::set<std::string>
PutS3Object::CANNED_ACLS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::CANNED_ACL_MAP));
+const std::set<std::string> PutS3Object::REGIONS({region::AF_SOUTH_1,
region::AP_EAST_1, region::AP_NORTHEAST_1,
+ region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1,
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
+ region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1,
region::EU_CENTRAL_1, region::EU_NORTH_1,
+ region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3,
region::ME_SOUTH_1, region::SA_EAST_1,
+ region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1,
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
+const std::set<std::string>
PutS3Object::STORAGE_CLASSES(minifi::utils::MapUtils::getKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
+const std::set<std::string>
PutS3Object::SERVER_SIDE_ENCRYPTIONS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
+
+const core::Property PutS3Object::ObjectKey(
+ core::PropertyBuilder::createProperty("Object Key")
+ ->withDescription("The key of the S3 object. If none is given the filename
attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("The S3 bucket")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ContentType(
+ core::PropertyBuilder::createProperty("Content Type")
+ ->withDescription("Sets the Content-Type HTTP header indicating the type
of content stored in "
+ "the associated object. The value of this header is a
standard MIME type. "
+ "If no content type is provided the default content type
"
+ "\"application/octet-stream\" will be used.")
+ ->supportsExpressionLanguage(true)
+ ->withDefaultValue<std::string>("application/octet-stream")
+ ->build());
+const core::Property PutS3Object::AccessKey(
+ core::PropertyBuilder::createProperty("Access Key")
+ ->withDescription("AWS account access key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::SecretKey(
+ core::PropertyBuilder::createProperty("Secret Key")
+ ->withDescription("AWS account secret key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::CredentialsFile(
+ core::PropertyBuilder::createProperty("Credentials File")
+ ->withDescription("Path to a file containing AWS access key and secret key
in properties file format. Properties used: accessKey and secretKey")
+ ->build());
+const core::Property PutS3Object::AWSCredentialsProviderService(
+ core::PropertyBuilder::createProperty("AWS Credentials Provider service")
+ ->withDescription("The name of the AWS Credentials Provider controller
service that is used to obtain AWS credentials.")
+ ->build());
+const core::Property PutS3Object::StorageClass(
+ core::PropertyBuilder::createProperty("Storage Class")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>("Standard")
+ ->withAllowableValues<std::string>(PutS3Object::STORAGE_CLASSES)
+ ->withDescription("AWS S3 Storage Class")
+ ->build());
+const core::Property PutS3Object::Region(
+ core::PropertyBuilder::createProperty("Region")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(region::US_WEST_2)
+ ->withAllowableValues<std::string>(PutS3Object::REGIONS)
+ ->withDescription("AWS Region")
+ ->build());
+const core::Property PutS3Object::CommunicationsTimeout(
+ core::PropertyBuilder::createProperty("Communications Timeout")
+ ->isRequired(true)
+ ->withDefaultValue<core::TimePeriodValue>("30 sec")
+ ->withDescription("")
+ ->build());
+const core::Property PutS3Object::FullControlUserList(
+ core::PropertyBuilder::createProperty("FullControl User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have Full Control for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ReadPermissionUserList(
+ core::PropertyBuilder::createProperty("Read Permission User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have Read Access for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ReadACLUserList(
+ core::PropertyBuilder::createProperty("Read ACL User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have permissions to read "
+ "the Access Control List for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::WriteACLUserList(
+ core::PropertyBuilder::createProperty("Write ACL User List")
+ ->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have permissions to change "
+ "the Access Control List for an object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::CannedACL(
+ core::PropertyBuilder::createProperty("Canned ACL")
+ ->withDescription("Amazon Canned ACL for an object. Allowed values:
BucketOwnerFullControl, BucketOwnerRead, AuthenticatedRead, "
+ "PublicReadWrite, PublicRead, Private, AwsExecRead; will
be ignored if any other ACL/permission property is specified.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::EndpointOverrideURL(
+ core::PropertyBuilder::createProperty("Endpoint Override URL")
+ ->withDescription("Endpoint URL to use instead of the AWS default
including scheme, host, "
+ "port, and path. The AWS libraries select an endpoint
URL based on the AWS "
+ "region, but this property overrides the selected
endpoint URL, allowing use "
+ "with other S3-compatible endpoints.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ServerSideEncryption(
+ core::PropertyBuilder::createProperty("Server Side Encryption")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>("None")
+ ->withAllowableValues<std::string>(PutS3Object::SERVER_SIDE_ENCRYPTIONS)
+ ->withDescription("Specifies the algorithm used for server side
encryption.")
+ ->build());
+const core::Property PutS3Object::ProxyHost(
+ core::PropertyBuilder::createProperty("Proxy Host")
+ ->withDescription("Proxy host name or IP")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ProxyPort(
+ core::PropertyBuilder::createProperty("Proxy Port")
+ ->withDescription("The port number of the proxy host")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ProxyUsername(
+ core::PropertyBuilder::createProperty("Proxy Username")
+ ->withDescription("Username to set when authenticating against proxy")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property PutS3Object::ProxyPassword(
+ core::PropertyBuilder::createProperty("Proxy Password")
+ ->withDescription("Password to set when authenticating against proxy")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship PutS3Object::Success("success", "FlowFiles are routed
to success relationship");
+const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed
to failure relationship");
+
+void PutS3Object::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(ObjectKey);
+ properties.insert(Bucket);
+ properties.insert(ContentType);
+ properties.insert(AccessKey);
+ properties.insert(SecretKey);
+ properties.insert(CredentialsFile);
+ properties.insert(AWSCredentialsProviderService);
+ properties.insert(StorageClass);
+ properties.insert(Region);
+ properties.insert(CommunicationsTimeout);
+ properties.insert(FullControlUserList);
+ properties.insert(ReadPermissionUserList);
+ properties.insert(ReadACLUserList);
+ properties.insert(WriteACLUserList);
+ properties.insert(CannedACL);
+ properties.insert(EndpointOverrideURL);
+ properties.insert(ServerSideEncryption);
+ properties.insert(ProxyHost);
+ properties.insert(ProxyPort);
+ properties.insert(ProxyUsername);
+ properties.insert(ProxyPassword);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Failure);
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const {
+ std::string service_name;
+ if (context->getProperty(AWSCredentialsProviderService.getName(),
service_name) && !service_name.empty()) {
+ std::shared_ptr<core::controller::ControllerService> service =
context->getControllerService(service_name);
+ if (nullptr != service) {
+ auto aws_credentials_service =
std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
+ if (aws_credentials_service) {
+ return
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
+ }
+ }
+ }
+ return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromProperties(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) const {
+ std::string access_key;
+ context->getProperty(AccessKey, access_key, flow_file);
+ std::string secret_key;
+ context->getProperty(SecretKey, secret_key, flow_file);
+ if (!access_key.empty() && !secret_key.empty()) {
+ Aws::Auth::AWSCredentials creds(access_key, secret_key);
+ return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+ }
+ return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromFile(const
std::shared_ptr<core::ProcessContext> &context) const {
+ std::string credential_file;
+ if (context->getProperty(CredentialsFile.getName(), credential_file) &&
!credential_file.empty()) {
+ auto properties = std::make_shared<minifi::Properties>();
+ properties->loadConfigureFile(credential_file.c_str());
+ std::string access_key;
+ std::string secret_key;
+ if (properties->getString("accessKey", access_key) && !access_key.empty()
&& properties->getString("secretKey", secret_key) && !secret_key.empty()) {
+ Aws::Auth::AWSCredentials creds(access_key, secret_key);
+ return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+ }
+ }
+ return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentials(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) const {
+ auto prop_cred = getAWSCredentialsFromProperties(context, flow_file);
+ if (prop_cred) {
+ logger_->log_info("AWS Credentials successfully set from properties");
+ return prop_cred.value();
+ }
+
+ auto file_cred = getAWSCredentialsFromFile(context);
+ if (file_cred) {
+ logger_->log_info("AWS Credentials successfully set from file");
+ return file_cred.value();
+ }
+
+ auto service_cred = getAWSCredentialsFromControllerService(context);
+ if (service_cred) {
+ logger_->log_info("AWS Credentials successfully set from controller
service");
+ return service_cred.value();
+ }
+
+ return minifi::utils::nullopt;
+}
+
+void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext>
&context) {
+ const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+ bool first_property = true;
+ for (const auto &prop_key : dynamic_prop_keys) {
+ std::string prop_value = "";
+ if (context->getDynamicProperty(prop_key, prop_value) &&
!prop_value.empty()) {
+ logger_->log_debug("PutS3Object: DynamicProperty: [%s] -> [%s]",
prop_key, prop_value);
+ put_s3_request_params_.user_metadata_map.emplace(prop_key, prop_value);
+ if (first_property) {
+ user_metadata_ = prop_key + "=" + prop_value;
+ first_property = false;
+ } else {
+ user_metadata_ += "," + prop_key + "=" + prop_value;
+ }
+ }
+ }
+ logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
+}
+
+bool PutS3Object::setProxy(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
+ aws::s3::ProxyOptions proxy;
+ context->getProperty(ProxyHost, proxy.host, flow_file);
+ std::string port_str;
+ if (context->getProperty(ProxyPort, port_str, flow_file) &&
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
+ logger_->log_error("Proxy port invalid");
+ return false;
+ }
+ context->getProperty(ProxyUsername, proxy.username, flow_file);
+ context->getProperty(ProxyPassword, proxy.password, flow_file);
+ if (!proxy.host.empty()) {
+ s3_wrapper_->setProxy(proxy);
+ logger_->log_info("Proxy for PutS3Object was set.");
+ }
+ return true;
+}
+
+void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ if (!context->getProperty(Bucket.getName(), put_s3_request_params_.bucket)
|| put_s3_request_params_.bucket.empty()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or
invalid");
+ }
+ logger_->log_debug("PutS3Object: Bucket [%s]",
put_s3_request_params_.bucket);
+
+ if (!context->getProperty(StorageClass.getName(),
put_s3_request_params_.storage_class)
+ || put_s3_request_params_.storage_class.empty()
+ || STORAGE_CLASSES.find(put_s3_request_params_.storage_class) ==
STORAGE_CLASSES.end()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Class property
missing or invalid");
+ }
+ logger_->log_debug("PutS3Object: Storage Class [%s]",
put_s3_request_params_.storage_class);
+
+ std::string value;
+ if (!context->getProperty(Region.getName(), value) || value.empty() ||
REGIONS.find(value) == REGIONS.end()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or
invalid");
+ }
+ s3_wrapper_->setRegion(value);
+ logger_->log_debug("PutS3Object: Region [%s]", value);
+
+ uint64_t timeout_val;
+ if (context->getProperty(CommunicationsTimeout.getName(), value) &&
!value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
+ s3_wrapper_->setTimeout(timeout_val);
+ logger_->log_debug("PutS3Object: Communications Timeout [%d]",
timeout_val);
+ } else {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout
missing or invalid");
+ }
+
+ if (!context->getProperty(ServerSideEncryption.getName(),
put_s3_request_params_.server_side_encryption)
+ || put_s3_request_params_.server_side_encryption.empty()
+ ||
SERVER_SIDE_ENCRYPTIONS.find(put_s3_request_params_.server_side_encryption) ==
SERVER_SIDE_ENCRYPTIONS.end()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Server Side Encryption
property missing or invalid");
+ }
+ logger_->log_debug("PutS3Object: Server Side Encryption [%s]",
put_s3_request_params_.server_side_encryption);
+
+ fillUserMetadata(context);
+}
+
+std::string PutS3Object::parseAccessControlList(const std::string
&comma_separated_list) const {
+ std::string result_list;
+ bool is_first = true;
+ for (const auto& user :
minifi::utils::StringUtils::split(comma_separated_list, ",")) {
+ if (is_first) {
+ is_first = false;
+ } else {
+ result_list += ", ";
Review comment:
This is already implemented in join.
I bit more simple way of doing this:
```
auto users : minifi::utils::StringUtils::split(comma_separated_list, ",")
for(auto& user: users) {
auto trimmed_user = minifi::utils::StringUtils::trim(user);
if (trimmed_user.find('@') != std::string::npos) {
user = "emailAddress=\"" + trimmed_user + "\"";
} else {
user += "id=" + trimmed_user;
}
}
return minifi::utils::StringUtils::join(", ", users);
```
##########
File path: extensions/aws/utils/AWSSdkLogger.cpp
##########
@@ -0,0 +1,105 @@
+/**
+ * @file AWSSdkLogger.cpp
+ * AWSSdkLogger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AWSSdkLogger.h"
+
+#include "aws/core/utils/logging/LogLevel.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace utils {
+
+Aws::Utils::Logging::LogLevel AWSSdkLogger::GetLogLevel() const {
+ if (logger_->should_log(minifi::core::logging::LOG_LEVEL::trace))
+ return Aws::Utils::Logging::LogLevel::Trace;
+ if (logger_->should_log(minifi::core::logging::LOG_LEVEL::debug))
+ return Aws::Utils::Logging::LogLevel::Debug;
+ if (logger_->should_log(minifi::core::logging::LOG_LEVEL::info))
+ return Aws::Utils::Logging::LogLevel::Info;
+ if (logger_->should_log(minifi::core::logging::LOG_LEVEL::warn))
+ return Aws::Utils::Logging::LogLevel::Warn;
+ if (logger_->should_log(minifi::core::logging::LOG_LEVEL::err))
+ return Aws::Utils::Logging::LogLevel::Error;
+ if (logger_->should_log(minifi::core::logging::LOG_LEVEL::critical))
+ return Aws::Utils::Logging::LogLevel::Fatal;
+ return Aws::Utils::Logging::LogLevel::Off;
+}
+
+void AWSSdkLogger::Log(Aws::Utils::Logging::LogLevel log_level, const char*
tag, const char* format_str, ...) {
+ switch (log_level) {
+ case Aws::Utils::Logging::LogLevel::Trace:
+ logger_->log_trace("[%s] %s", tag, format_str);
+ break;
+ case Aws::Utils::Logging::LogLevel::Debug:
+ logger_->log_debug("[%s] %s", tag, format_str);
+ break;
+ case Aws::Utils::Logging::LogLevel::Info:
+ logger_->log_info("[%s] %s", tag, format_str);
+ break;
+ case Aws::Utils::Logging::LogLevel::Warn:
+ logger_->log_warn("[%s] %s", tag, format_str);
+ break;
+ case Aws::Utils::Logging::LogLevel::Error:
+ logger_->log_error("[%s] %s", tag, format_str);
+ break;
+ case Aws::Utils::Logging::LogLevel::Fatal:
+ logger_->log_error("[%s] %s", tag, format_str);
+ break;
+ default:
+ break;
+ }
+}
+
+void AWSSdkLogger::LogStream(Aws::Utils::Logging::LogLevel log_level, const
char* tag, const Aws::OStringStream &message_stream) {
+ switch (log_level) {
Review comment:
Instead of copy-pasting the logic, can't this simply call the one above?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]