fgerlits commented on a change in pull request #1268: URL: https://github.com/apache/nifi-minifi-cpp/pull/1268#discussion_r827905433
########## File path: extensions/gcp/processors/PutGCSObject.cpp ########## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGCSObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "io/StreamPipe.h" +#include "utils/OptionalUtils.h" +#include "../GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGCSObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GCPCredentialsControllerService>() + ->build()); + +const core::Property PutGCSObject::Bucket( + core::PropertyBuilder::createProperty("Bucket") + ->withDescription("Bucket of the object.") + ->withDefaultValue("${gcs.bucket}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::Key( + core::PropertyBuilder::createProperty("Name of the object.") + ->withDescription("Name of the object.") + ->withDefaultValue("${filename}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGCSObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("Content Type for the file, i.e. text/plain ") + ->isRequired(false) + ->withDefaultValue("${mime.type}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::MD5Hash( + core::PropertyBuilder::createProperty("MD5 Hash") + ->withDescription("MD5 Hash (encoded in Base64) of the file for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::Crc32cChecksum( + core::PropertyBuilder::createProperty("CRC32C Checksum") + ->withDescription("CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGCSObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Property PutGCSObject::EndpointOverrideURL( + core::PropertyBuilder::createProperty("Endpoint Override URL") + ->withDescription("Overrides the default Google Cloud Storage endpoints") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Relationship PutGCSObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGCSObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(const std::string& hash_value_str) { + hash_value_ = gcs::MD5HashValue(hash_value_str); + } + + void setCrc32CChecksumValue(const std::string& crc32c_checksum_str) { + crc32c_checksum_ = gcs::Crc32cChecksumValue(crc32c_checksum_str); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(PutGCSObject::PredefinedAcl predefined_acl) { + predefined_acl_ = gcs::PredefinedAcl(predefined_acl.toString()); + } + + void setContentType(const std::string& content_type_str) { + content_type_ = gcs::ContentType(content_type_str); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) { + std::string service_name; + if (context.getProperty(PutGCSObject::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) + return std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name))->getCredentials(); + return nullptr; +} +} // namespace + + +void PutGCSObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + Key, + NumberOfRetries, + ContentType, + MD5Hash, + Crc32cChecksum, + EncryptionKey, + ObjectACL, + OverwriteObject, + EndpointOverrideURL}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGCSObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, EncryptionKey.getName() + " is not in base64: " + *encryption_key); Review comment: Not sure how paranoid we want to be, but this will print the (incorrect, but still probably secret) encryption key to the log file. Less info may be better in this case: ```suggestion throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName()); ``` ########## File path: extensions/gcp/processors/PutGCSObject.cpp ########## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGCSObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "io/StreamPipe.h" +#include "utils/OptionalUtils.h" +#include "../GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGCSObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GCPCredentialsControllerService>() + ->build()); + +const core::Property PutGCSObject::Bucket( + core::PropertyBuilder::createProperty("Bucket") + ->withDescription("Bucket of the object.") + ->withDefaultValue("${gcs.bucket}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::Key( + core::PropertyBuilder::createProperty("Name of the object.") + ->withDescription("Name of the object.") + ->withDefaultValue("${filename}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGCSObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("Content Type for the file, i.e. text/plain ") + ->isRequired(false) + ->withDefaultValue("${mime.type}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::MD5Hash( + core::PropertyBuilder::createProperty("MD5 Hash") + ->withDescription("MD5 Hash (encoded in Base64) of the file for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::Crc32cChecksum( + core::PropertyBuilder::createProperty("CRC32C Checksum") + ->withDescription("CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGCSObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Property PutGCSObject::EndpointOverrideURL( + core::PropertyBuilder::createProperty("Endpoint Override URL") + ->withDescription("Overrides the default Google Cloud Storage endpoints") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Relationship PutGCSObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGCSObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(const std::string& hash_value_str) { + hash_value_ = gcs::MD5HashValue(hash_value_str); + } + + void setCrc32CChecksumValue(const std::string& crc32c_checksum_str) { + crc32c_checksum_ = gcs::Crc32cChecksumValue(crc32c_checksum_str); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(PutGCSObject::PredefinedAcl predefined_acl) { + predefined_acl_ = gcs::PredefinedAcl(predefined_acl.toString()); + } + + void setContentType(const std::string& content_type_str) { + content_type_ = gcs::ContentType(content_type_str); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) { + std::string service_name; + if (context.getProperty(PutGCSObject::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) + return std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name))->getCredentials(); Review comment: it would be safer to check if the `dynamic_cast` was successful, and return `nullptr` if it wasn't, instead of crashing logging could be useful, too, to distinguish the two cases (missing/empty property or incorrect type) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
