martinzink commented on a change in pull request #1268: URL: https://github.com/apache/nifi-minifi-cpp/pull/1268#discussion_r812813088
########## File path: cmake/GoogleCloudCpp.cmake ########## @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(FetchContent) +set(NLOHMANN_JSON_INCLUDE_DIR "${CMAKE_BINARY_DIR}/_deps/nlohmann/" CACHE STRING "" FORCE) Review comment: good catch I've added them in https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a# ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } Review comment: yeah, I agree changed it in https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a#diff-e4ee3acb6d325c61b7905d6fe7f5addf37e7f550bc41e17d94e66a04d50fdeb8R131-R148 ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); Review comment: good catch, fixed https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a#diff-e4ee3acb6d325c61b7905d6fe7f5addf37e7f550bc41e17d94e66a04d50fdeb8R175 ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); Review comment: Good point, changed it https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a#diff-e4ee3acb6d325c61b7905d6fe7f5addf37e7f550bc41e17d94e66a04d50fdeb8R238 ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); + } + } +} + +void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + + auto gcp_credentials_controller_service = getGCPCredentialsControllerService(*context); + if (!gcp_credentials_controller_service) { + logger_->log_error("Invalid or missing Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto credentials = gcp_credentials_controller_service->getCredentials(); + if (!credentials) { + logger_->log_error("Invalid or missing credentials from Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } Review comment: Good idea, I've changed it https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a#diff-e4ee3acb6d325c61b7905d6fe7f5addf37e7f550bc41e17d94e66a04d50fdeb8R241 ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); + } + } +} + +void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + + auto gcp_credentials_controller_service = getGCPCredentialsControllerService(*context); + if (!gcp_credentials_controller_service) { + logger_->log_error("Invalid or missing Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto credentials = gcp_credentials_controller_service->getCredentials(); + if (!credentials) { + logger_->log_error("Invalid or missing credentials from Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto ff = session->get(); + if (!ff) { + context->yield(); + return; + } + + auto flow_file = gsl::not_null(std::move(ff)); Review comment: you are right the not_null is not used anywhere so I'll removed it in https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a#diff-e4ee3acb6d325c61b7905d6fe7f5addf37e7f550bc41e17d94e66a04d50fdeb8L259 ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); + } + } +} + +void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + + auto gcp_credentials_controller_service = getGCPCredentialsControllerService(*context); + if (!gcp_credentials_controller_service) { + logger_->log_error("Invalid or missing Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto credentials = gcp_credentials_controller_service->getCredentials(); + if (!credentials) { + logger_->log_error("Invalid or missing credentials from Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto ff = session->get(); + if (!ff) { + context->yield(); + return; + } + + auto flow_file = gsl::not_null(std::move(ff)); + auto bucket = context->getProperty(Bucket, flow_file) | utils::orElse([&flow_file] {return flow_file->getAttribute(GCS_BUCKET_ATTR);}); + if (!bucket) { + logger_->log_error("Missing bucket name"); + session->transfer(flow_file, Failure); + return; + } + auto object_name = context->getProperty(ObjectName, flow_file) | utils::orElse([&flow_file] {return flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME);}); + if (!object_name) { + logger_->log_error("Missing object name"); + session->transfer(flow_file, Failure); + return; + } + + gcs::Client client = getClient(gcs::ClientOptions(credentials)); + UploadToGCSCallback callback(client, *bucket, *object_name); + if (auto crc32_checksum_location = context->getProperty(Crc32cChecksumLocation, flow_file)) + callback.setCrc32CChecksumValue(flow_file->getAttribute(*crc32_checksum_location)); + if (auto md5_hash_location = context->getProperty(MD5HashLocation, flow_file)) + callback.setHashValue(flow_file->getAttribute(*md5_hash_location)); + callback.setContentType(getContentType(*context, *flow_file)); + callback.setPredefinedAcl(context->getProperty<PredefinedAcl>(ObjectACL)); + callback.setIfGenerationMatch(context->getProperty<bool>(OverwriteObject)); + + callback.setEncryptionKey(encryption_key_); + + session->read(flow_file, &callback); + auto& result = callback.getResult(); + if (!result.ok()) { + flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason()); + flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain()); + logger_->log_error("Failed to upload to Google Cloud Storage %s", result.status().error_info().reason()); + session->transfer(flow_file, Failure); + } else { + setAttributesFromObjectMetadata(*flow_file, *result); Review comment: since the `session->transfer(flow_file, Success);` already logs every transfer I don't think that would add any additional information ########## File path: extensions/gcp/processors/PutGcsObject.h ########## @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <memory> + +#include "core/Processor.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "../controllerservices/GcpCredentialsControllerService.h" +#include "google/cloud/storage/client.h" +#include "google/cloud/storage/retry_policy.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::extensions::gcp { + +class PutGcsObject : public core::Processor { + public: + SMART_ENUM(PredefinedAcl, + (AUTHENTICATED_READ, "authenticatedRead"), + (BUCKET_OWNER_FULL_CONTROL, "bucketOwnerFullControl"), + (BUCKET_OWNER_READ_ONLY, "bucketOwnerRead"), + (PRIVATE, "private"), + (PROJECT_PRIVATE, "projectPrivate"), + (PUBLIC_READ_ONLY, "publicRead"), + (PUBLIC_READ_WRITE, "publicReadWrite")); + + explicit PutGcsObject(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid) { + } + PutGcsObject(const PutGcsObject&) = delete; + PutGcsObject(PutGcsObject&&) = delete; + PutGcsObject& operator=(const PutGcsObject&) = delete; + PutGcsObject& operator=(PutGcsObject&&) = delete; + ~PutGcsObject() override = default; + + EXTENSIONAPI static const core::Property GCPCredentials; + EXTENSIONAPI static const core::Property Bucket; + EXTENSIONAPI static const core::Property ObjectName; + + EXTENSIONAPI static const core::Property NumberOfRetries; + Review comment: Moved them closer so no one will feel left out. https://github.com/apache/nifi-minifi-cpp/pull/1268/commits/da24c2f58d250f0cf02f29eb499d1b1a237a366a#diff-d9ac3742ff997db0508135e280c46fc6e01f7232611b8cae67cfe41957f7451bL56-L58 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
