adamdebreceni commented on a change in pull request #915:
URL: https://github.com/apache/nifi-minifi-cpp/pull/915#discussion_r498821457



##########
File path: extensions/aws/processors/PutS3Object.cpp
##########
@@ -0,0 +1,487 @@
+/**
+ * @file PutS3Object.cpp
+ * PutS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutS3Object.h"
+
+#include <string>
+#include <regex>
+#include <set>
+#include <memory>
+
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+namespace {
+
+template<typename T, typename U>
+std::set<T> getMapKeys(const std::map<T,U>& m) {
+  std::set<T> keys;
+  for (const auto& pair : m) {
+    keys.insert(pair.first);
+  }
+  return keys;
+}
+
+}  // namespace
+
+const std::set<std::string> 
PutS3Object::CANNED_ACLS(getMapKeys(minifi::aws::s3::CANNED_ACL_MAP));
+const std::set<std::string> PutS3Object::REGIONS({region::US_GOV_WEST_1, 
region::US_EAST_1, region::US_EAST_2, region::US_WEST_1,
+  region::US_WEST_2, region::EU_WEST_1, region::EU_WEST_2, 
region::EU_CENTRAL_1, region::AP_SOUTH_1,
+  region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2, region::AP_NORTHEAST_1, 
region::AP_NORTHEAST_2,
+  region::SA_EAST_1, region::CN_NORTH_1, region::CA_CENTRAL_1});
+const std::set<std::string> 
PutS3Object::STORAGE_CLASSES(getMapKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
+const std::set<std::string> 
PutS3Object::SERVER_SIDE_ENCRYPTIONS(getMapKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
+
+const core::Property PutS3Object::ObjectKey(
+  core::PropertyBuilder::createProperty("Object Key")
+    ->withDescription("The key of the S3 object")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${filename}")
+    ->build());
+const core::Property PutS3Object::Bucket(
+  core::PropertyBuilder::createProperty("Bucket")
+    ->withDescription("The S3 bucket")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ContentType(
+  core::PropertyBuilder::createProperty("Content Type")
+    ->withDescription("Sets the Content-Type HTTP header indicating the type 
of content stored in "
+                      "the associated object. The value of this header is a 
standard MIME type. "
+                      "If no content type is provided the default content type 
"
+                      "\"application/octet-stream\" will be used.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("application/octet-stream")
+    ->build());
+const core::Property PutS3Object::AccessKey(
+  core::PropertyBuilder::createProperty("Access Key")
+    ->withDescription("AWS account access key")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::SecretKey(
+  core::PropertyBuilder::createProperty("Secret Key")
+    ->withDescription("AWS account secret key")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::CredentialsFile(
+  core::PropertyBuilder::createProperty("Credentials File")
+    ->withDescription("Path to a file containing AWS access key and secret key 
in properties file format. Properties used: accessKey and secretKey")
+    ->build());
+const core::Property PutS3Object::AWSCredentialsProviderService(
+  core::PropertyBuilder::createProperty("AWS Credentials Provider service")
+    ->withDescription("The name of the AWS Credentials Provider controller 
service that is used to obtain AWS credentials.")
+    ->build());
+const core::Property PutS3Object::StorageClass(
+  core::PropertyBuilder::createProperty("Storage Class")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>("Standard")
+    ->withAllowableValues<std::string>(PutS3Object::STORAGE_CLASSES)
+    ->withDescription("AWS S3 Storage Class")
+    ->build());
+const core::Property PutS3Object::Region(
+  core::PropertyBuilder::createProperty("Region")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(region::US_WEST_2)
+    ->withAllowableValues<std::string>(PutS3Object::REGIONS)
+    ->withDescription("AWS Region")
+    ->build());
+const core::Property PutS3Object::CommunicationsTimeout(
+  core::PropertyBuilder::createProperty("Communications Timeout")
+    ->isRequired(true)
+    ->withDefaultValue<core::TimePeriodValue>("30 sec")
+    ->withDescription("")
+    ->build());
+const core::Property PutS3Object::FullControlUserList(
+  core::PropertyBuilder::createProperty("FullControl User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Full Control for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.full.users}")
+    ->build());
+const core::Property PutS3Object::ReadPermissionUserList(
+  core::PropertyBuilder::createProperty("Read Permission User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Read Access for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.read.users}")
+    ->build());
+const core::Property PutS3Object::ReadACLUserList(
+  core::PropertyBuilder::createProperty("Read ACL User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to read the Access Control 
List for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.readacl.users}")
+    ->build());
+const core::Property PutS3Object::WriteACLUserList(
+  core::PropertyBuilder::createProperty("Write ACL User List")
+    ->withDescription("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to change the Access 
Control List for an object.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.writeacl.users}")
+    ->build());
+const core::Property PutS3Object::CannedACL(
+  core::PropertyBuilder::createProperty("Canned ACL")
+    ->withDescription("Amazon Canned ACL for an object. Allowed values: 
BucketOwnerFullControl, BucketOwnerRead, AuthenticatedRead, PublicReadWrite, 
PublicRead, Private, AwsExecRead; will be ignored if any other ACL/permission 
property is specified.")
+    ->supportsExpressionLanguage(true)
+    ->withDefaultValue<std::string>("${s3.permissions.cannedacl}")
+    ->build());
+const core::Property PutS3Object::EndpointOverrideURL(
+  core::PropertyBuilder::createProperty("Endpoint Override URL")
+    ->withDescription("Endpoint URL to use instead of the AWS default 
including scheme, host, "
+                      "port, and path. The AWS libraries select an endpoint 
URL based on the AWS "
+                      "region, but this property overrides the selected 
endpoint URL, allowing use "
+                      "with other S3-compatible endpoints.")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ServerSideEncryption(
+  core::PropertyBuilder::createProperty("Server Side Encryption")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>("None")
+    ->withAllowableValues<std::string>(PutS3Object::SERVER_SIDE_ENCRYPTIONS)
+    ->withDescription("Specifies the algorithm used for server side 
encryption.")
+    ->build());
+const core::Property PutS3Object::ProxyHost(
+  core::PropertyBuilder::createProperty("Proxy Host")
+    ->withDescription("Proxy host name or IP")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ProxyPort(
+  core::PropertyBuilder::createProperty("Proxy Port")
+    ->withDescription("The port number of the proxy host")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ProxyUsername(
+    core::PropertyBuilder::createProperty("Proxy Username")
+    ->withDescription("Username to set when authenticating against proxy")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property PutS3Object::ProxyPassword(
+  core::PropertyBuilder::createProperty("Proxy Password")
+    ->withDescription("Password to set when authenticating against proxy")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+const core::Relationship PutS3Object::Success("success", "FlowFiles are routed 
to success relationship");
+const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed 
to failure relationship");
+
+void PutS3Object::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(ObjectKey);
+  properties.insert(Bucket);
+  properties.insert(ContentType);
+  properties.insert(AccessKey);
+  properties.insert(SecretKey);
+  properties.insert(CredentialsFile);
+  properties.insert(AWSCredentialsProviderService);
+  properties.insert(StorageClass);
+  properties.insert(Region);
+  properties.insert(CommunicationsTimeout);
+  properties.insert(FullControlUserList);
+  properties.insert(ReadPermissionUserList);
+  properties.insert(ReadACLUserList);
+  properties.insert(WriteACLUserList);
+  properties.insert(CannedACL);
+  properties.insert(EndpointOverrideURL);
+  properties.insert(ServerSideEncryption);
+  properties.insert(ProxyHost);
+  properties.insert(ProxyPort);
+  properties.insert(ProxyUsername);
+  properties.insert(ProxyPassword);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Failure);
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> 
PutS3Object::getAWSCredentialsFromControllerService(const 
std::shared_ptr<core::ProcessContext> &context) const {
+  std::string service_name;
+  if (context->getProperty(AWSCredentialsProviderService.getName(), 
service_name) && !service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(service_name);
+    if (nullptr != service) {
+      auto aws_credentials_service = 
std::static_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
+      return 
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
+    }
+  }
+  return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> 
PutS3Object::getAWSCredentialsFromProperties(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
+  std::string access_key;
+  context->getProperty(AccessKey, access_key, flow_file);
+  std::string secret_key;
+  context->getProperty(SecretKey, secret_key, flow_file);
+  if (!access_key.empty() && !secret_key.empty()) {
+    Aws::Auth::AWSCredentials creds(access_key, secret_key);
+    return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+  }
+  return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> 
PutS3Object::getAWSCredentialsFromFile(const 
std::shared_ptr<core::ProcessContext> &context) const {
+  std::string credential_file;
+  if (context->getProperty(CredentialsFile.getName(), credential_file) && 
!credential_file.empty()) {
+    auto properties = std::make_shared<minifi::Properties>();
+    properties->loadConfigureFile(credential_file.c_str());
+    std::string access_key;
+    std::string secret_key;
+    if (properties->get("accessKey", access_key) && !access_key.empty() && 
properties->get("secretKey", secret_key) && !secret_key.empty()) {
+      Aws::Auth::AWSCredentials creds(access_key, secret_key);
+      return minifi::utils::make_optional<Aws::Auth::AWSCredentials>(creds);
+    }
+  }
+  return minifi::utils::nullopt;
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> 
PutS3Object::getAWSCredentials(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
+  auto prop_cred = getAWSCredentialsFromProperties(context, flow_file);
+  if (prop_cred) {
+    logger_->log_info("AWS Credentials successfully set from properties");
+    return prop_cred.value();
+  }
+
+  auto file_cred = getAWSCredentialsFromFile(context);
+  if (file_cred) {
+    logger_->log_info("AWS Credentials successfully set from file");
+    return file_cred.value();
+  }
+
+  auto service_cred = getAWSCredentialsFromControllerService(context);
+  if (service_cred) {
+    logger_->log_info("AWS Credentials successfully set from controller 
service");
+    return service_cred.value();
+  }
+
+  return minifi::utils::nullopt;
+}
+
+void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext> 
&context) {
+  const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+  bool first_property = true;
+  for (const auto &prop_key : dynamic_prop_keys) {
+    std::string prop_value = "";
+    if (context->getDynamicProperty(prop_key, prop_value) && 
!prop_value.empty()) {
+      logger_->log_debug("PutS3Object: DynamicProperty: [%s] -> [%s]", 
prop_key, prop_value);
+      put_s3_request_params_.user_metadata_map.emplace(prop_key, prop_value);
+      if (first_property) {
+        user_metadata_ = prop_key + "=" + prop_value;
+        first_property = false;
+      } else {
+        user_metadata_ += "," + prop_key + "=" + prop_value;
+      }
+    }
+  }
+  logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
+}
+
+bool PutS3Object::setProxy(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
+  aws::s3::ProxyOptions proxy;
+  context->getProperty(ProxyHost, proxy.host, flow_file);
+  std::string port_str;
+  if (context->getProperty(ProxyPort, port_str, flow_file) && 
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
+    logger_->log_error("Proxy port invalid");
+    return false;
+  }
+  context->getProperty(ProxyUsername, proxy.username, flow_file);
+  context->getProperty(ProxyPassword, proxy.password, flow_file);
+  if (!proxy.host.empty()) {
+    s3_wrapper_->setProxy(proxy);
+    logger_->log_info("Proxy for PutS3Object was set.");
+  }
+  return true;
+}
+
+void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  if (!context->getProperty(Bucket.getName(), put_s3_request_params_.bucket) 
|| put_s3_request_params_.bucket.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or 
invalid");
+  }
+  logger_->log_debug("PutS3Object: Bucket [%s]", 
put_s3_request_params_.bucket);
+
+  if (!context->getProperty(StorageClass.getName(), 
put_s3_request_params_.storage_class)
+      || put_s3_request_params_.storage_class.empty()
+      || STORAGE_CLASSES.find(put_s3_request_params_.storage_class) == 
STORAGE_CLASSES.end()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Class property 
missing or invalid");
+  }
+  logger_->log_debug("PutS3Object: Storage Class [%s]", 
put_s3_request_params_.storage_class);
+
+  std::string value;
+  if (!context->getProperty(Region.getName(), value) || value.empty() || 
REGIONS.find(value) == REGIONS.end()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or 
invalid");
+  }
+  s3_wrapper_->setRegion(value);
+  logger_->log_debug("PutS3Object: Region [%s]", value);
+
+  uint64_t timeout_val;
+  if (context->getProperty(CommunicationsTimeout.getName(), value) && 
!value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
+    s3_wrapper_->setTimeout(timeout_val);
+    logger_->log_debug("PutS3Object: Communications Timeout [%d]", 
timeout_val);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout 
missing or invalid");
+  }
+
+  if (!context->getProperty(ServerSideEncryption.getName(), 
put_s3_request_params_.server_side_encryption)
+      || put_s3_request_params_.server_side_encryption.empty()
+      || 
SERVER_SIDE_ENCRYPTIONS.find(put_s3_request_params_.server_side_encryption) == 
SERVER_SIDE_ENCRYPTIONS.end()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Server Side Encryption 
property missing or invalid");
+  }
+  logger_->log_debug("PutS3Object: Server Side Encryption [%s]", 
put_s3_request_params_.server_side_encryption);
+
+  fillUserMetadata(context);
+}
+
+std::string PutS3Object::parseAccessControlList(const std::string 
&comma_separated_list) const {
+  std::string result_list;
+  bool is_first = true;
+  for (const auto& user : 
minifi::utils::StringUtils::split(comma_separated_list, ",")) {
+    if (is_first) {
+      is_first = false;
+    } else {
+      result_list += ", ";
+    }
+
+    auto trimmed_user = minifi::utils::StringUtils::trim(user);
+    static const std::regex 
email_pattern("(\\w+)(\\.|_)?(\\w*)@(\\w+)(\\.(\\w+))+");
+    if (std::regex_match(trimmed_user, email_pattern)) {

Review comment:
       doesn't `\w` also match `_`?
   also note that this won't match all valid email addresses (if that is the 
purpose), the regex for that is quite... a mouthful




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to