lordgamez commented on a change in pull request #1268: URL: https://github.com/apache/nifi-minifi-cpp/pull/1268#discussion_r811895379
########## File path: extensions/gcp/tests/PutGcsObjectTests.cpp ########## @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "../processors/PutGcsObject.h" +#include "../utils/GCPAttributes.h" +#include "core/Resource.h" +#include "SingleInputTestController.h" +#include "google/cloud/storage/testing/mock_client.h" +#include "google/cloud/storage/internal/object_metadata_parser.h" +#include "google/cloud/storage/retry_policy.h" +#include "google/cloud/storage/testing/canonical_errors.h" + +namespace gcs = ::google::cloud::storage; +namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; + +using PutGcsObject = org::apache::nifi::minifi::extensions::gcp::PutGcsObject; +using GcpCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GcpCredentialsControllerService; +using ResumableUploadRequest = gcs::internal::ResumableUploadRequest; +using ResumableUploadResponse = gcs::internal::ResumableUploadResponse; +using ResumableUploadSession = gcs::internal::ResumableUploadSession; +using ::google::cloud::storage::testing::canonical_errors::TransientError; +using ::google::cloud::storage::testing::canonical_errors::PermanentError; + +namespace { +class PutGcsObjectMocked : public PutGcsObject { + using org::apache::nifi::minifi::extensions::gcp::PutGcsObject::PutGcsObject; + public: + gcs::Client getClient(const gcs::ClientOptions&) const override { + return gcs::testing::ClientFromMock(mock_client_, *retry_policy_); + } + std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>(); +}; +REGISTER_RESOURCE(PutGcsObjectMocked, "PutGcsObjectMocked"); +} // namespace + +class PutGcsObjectTests : public ::testing::Test { Review comment: Maybe we can also add system tests as well just to test the mocked out parts. I found this [fake GCS server](https://github.com/fsouza/fake-gcs-server) which could be set up as a docker container, it could be worth a try. ########## File path: cmake/GoogleCloudCpp.cmake ########## @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(FetchContent) +set(NLOHMANN_JSON_INCLUDE_DIR "${CMAKE_BINARY_DIR}/_deps/nlohmann/" CACHE STRING "" FORCE) Review comment: LICENSE and NOTICE files should be updated with all the new dependecy libraries. ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); Review comment: There is an unnecessary whitespace after `orElse` ########## File path: extensions/gcp/utils/GCPAttributes.h ########## @@ -0,0 +1,45 @@ +/** Review comment: I think this file can be simply in the `gcp` folder, I don't think utils is a good place for this. It's also usually a code smell when we shove something in the utils package or folder because we don't know what to do with it :) ########## File path: extensions/gcp/processors/PutGcsObject.h ########## @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <memory> + +#include "core/Processor.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "../controllerservices/GcpCredentialsControllerService.h" +#include "google/cloud/storage/client.h" +#include "google/cloud/storage/retry_policy.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::extensions::gcp { + +class PutGcsObject : public core::Processor { + public: + SMART_ENUM(PredefinedAcl, + (AUTHENTICATED_READ, "authenticatedRead"), + (BUCKET_OWNER_FULL_CONTROL, "bucketOwnerFullControl"), + (BUCKET_OWNER_READ_ONLY, "bucketOwnerRead"), + (PRIVATE, "private"), + (PROJECT_PRIVATE, "projectPrivate"), + (PUBLIC_READ_ONLY, "publicRead"), + (PUBLIC_READ_WRITE, "publicReadWrite")); + + explicit PutGcsObject(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid) { + } + PutGcsObject(const PutGcsObject&) = delete; + PutGcsObject(PutGcsObject&&) = delete; + PutGcsObject& operator=(const PutGcsObject&) = delete; + PutGcsObject& operator=(PutGcsObject&&) = delete; + ~PutGcsObject() override = default; + + EXTENSIONAPI static const core::Property GCPCredentials; + EXTENSIONAPI static const core::Property Bucket; + EXTENSIONAPI static const core::Property ObjectName; + + EXTENSIONAPI static const core::Property NumberOfRetries; + + EXTENSIONAPI static const core::Property ContentType; + EXTENSIONAPI static const core::Property MD5HashLocation; + EXTENSIONAPI static const core::Property Crc32cChecksumLocation; + EXTENSIONAPI static const core::Property EncryptionKey; + EXTENSIONAPI static const core::Property ObjectACL; + EXTENSIONAPI static const core::Property OverwriteObject; + + EXTENSIONAPI static const core::Relationship Success; + EXTENSIONAPI static const core::Relationship Failure; + + void initialize() override; + void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; + void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + bool isSingleThreaded() const override { + return true; Review comment: It seems to me that every trigger creates a separate gcs client with its separate options, so in theory it could be multithreaded, or is there something else that's blocking it? ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); + } + } +} + +void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + + auto gcp_credentials_controller_service = getGCPCredentialsControllerService(*context); + if (!gcp_credentials_controller_service) { + logger_->log_error("Invalid or missing Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto credentials = gcp_credentials_controller_service->getCredentials(); + if (!credentials) { + logger_->log_error("Invalid or missing credentials from Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto ff = session->get(); + if (!ff) { + context->yield(); + return; + } + + auto flow_file = gsl::not_null(std::move(ff)); Review comment: What's the reason for moving this variable from the previous one? Also is it worth checking for not null if we previously had a runtime check for it? ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } Review comment: This is more of a philosophical question, but do we want to change the member value if the optional value is empty? IMO if the parameter is optional, then we should only process it if it has a value and shouldn't change back to default value in case it's missing. Of course if we only use this once it doesn't make a difference. What do you think? ########## File path: extensions/gcp/processors/PutGcsObject.h ########## @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <memory> + +#include "core/Processor.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "../controllerservices/GcpCredentialsControllerService.h" +#include "google/cloud/storage/client.h" +#include "google/cloud/storage/retry_policy.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::extensions::gcp { + +class PutGcsObject : public core::Processor { + public: + SMART_ENUM(PredefinedAcl, + (AUTHENTICATED_READ, "authenticatedRead"), + (BUCKET_OWNER_FULL_CONTROL, "bucketOwnerFullControl"), + (BUCKET_OWNER_READ_ONLY, "bucketOwnerRead"), + (PRIVATE, "private"), + (PROJECT_PRIVATE, "projectPrivate"), + (PUBLIC_READ_ONLY, "publicRead"), + (PUBLIC_READ_WRITE, "publicReadWrite")); + + explicit PutGcsObject(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid) { + } + PutGcsObject(const PutGcsObject&) = delete; + PutGcsObject(PutGcsObject&&) = delete; + PutGcsObject& operator=(const PutGcsObject&) = delete; + PutGcsObject& operator=(PutGcsObject&&) = delete; + ~PutGcsObject() override = default; + + EXTENSIONAPI static const core::Property GCPCredentials; + EXTENSIONAPI static const core::Property Bucket; + EXTENSIONAPI static const core::Property ObjectName; + + EXTENSIONAPI static const core::Property NumberOfRetries; + Review comment: Why is the `NumberOfRetries` separated from the other properties? It might feel lonely. ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); + } + } +} + +void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + + auto gcp_credentials_controller_service = getGCPCredentialsControllerService(*context); + if (!gcp_credentials_controller_service) { + logger_->log_error("Invalid or missing Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto credentials = gcp_credentials_controller_service->getCredentials(); + if (!credentials) { + logger_->log_error("Invalid or missing credentials from Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto ff = session->get(); + if (!ff) { + context->yield(); + return; + } + + auto flow_file = gsl::not_null(std::move(ff)); + auto bucket = context->getProperty(Bucket, flow_file) | utils::orElse([&flow_file] {return flow_file->getAttribute(GCS_BUCKET_ATTR);}); + if (!bucket) { + logger_->log_error("Missing bucket name"); + session->transfer(flow_file, Failure); + return; + } + auto object_name = context->getProperty(ObjectName, flow_file) | utils::orElse([&flow_file] {return flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME);}); + if (!object_name) { + logger_->log_error("Missing object name"); + session->transfer(flow_file, Failure); + return; + } + + gcs::Client client = getClient(gcs::ClientOptions(credentials)); + UploadToGCSCallback callback(client, *bucket, *object_name); + if (auto crc32_checksum_location = context->getProperty(Crc32cChecksumLocation, flow_file)) + callback.setCrc32CChecksumValue(flow_file->getAttribute(*crc32_checksum_location)); + if (auto md5_hash_location = context->getProperty(MD5HashLocation, flow_file)) + callback.setHashValue(flow_file->getAttribute(*md5_hash_location)); + callback.setContentType(getContentType(*context, *flow_file)); + callback.setPredefinedAcl(context->getProperty<PredefinedAcl>(ObjectACL)); + callback.setIfGenerationMatch(context->getProperty<bool>(OverwriteObject)); + + callback.setEncryptionKey(encryption_key_); + + session->read(flow_file, &callback); + auto& result = callback.getResult(); + if (!result.ok()) { + flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason()); + flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain()); + logger_->log_error("Failed to upload to Google Cloud Storage %s", result.status().error_info().reason()); + session->transfer(flow_file, Failure); + } else { + setAttributesFromObjectMetadata(*flow_file, *result); Review comment: Minor: I would add a debug log for the success scenario ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); + } + } +} + +void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context && session); + + auto gcp_credentials_controller_service = getGCPCredentialsControllerService(*context); + if (!gcp_credentials_controller_service) { + logger_->log_error("Invalid or missing Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } + + auto credentials = gcp_credentials_controller_service->getCredentials(); + if (!credentials) { + logger_->log_error("Invalid or missing credentials from Google Cloud Platform Credentials Controller Service"); + context->yield(); + return; + } Review comment: Couldn't these be part of the onSchedule phase? ########## File path: extensions/gcp/processors/PutGcsObject.cpp ########## @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGcsObject.h" + +#include <vector> +#include <utility> + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/OptionalUtils.h" +#include "../utils/GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGcsObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType<GcpCredentialsControllerService>() + ->build()); + +const core::Property PutGcsObject::Bucket( + core::PropertyBuilder::createProperty("Bucket Name") + ->withDescription("The name of the Bucket to upload to. If left empty the gcs.bucket attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectName( + core::PropertyBuilder::createProperty("Object Name") + ->withDescription("The name of the object to be uploaded. If left empty the filename attribute will be used by default.") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue<uint64_t>(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGcsObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("The Content Type of the uploaded object. If not set, \"mime.type\" flow file attribute will be used. " + "In case of neither of them is specified, this information will not be sent to the server.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::MD5HashLocation( + core::PropertyBuilder::createProperty("MD5 Hash location") + ->withDescription("The name of the attribute where the md5 hash is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::Crc32cChecksumLocation( + core::PropertyBuilder::createProperty("CRC32 Checksum location") + ->withDescription("The name of the attribute where the crc32 checksum is stored for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGcsObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGcsObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue<bool>(true) + ->build()); + +const core::Relationship PutGcsObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGcsObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { + return result_; + } + + void setHashValue(std::optional<std::string> hash_value_str) { + hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : gcs::MD5HashValue(); + } + + void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) { + crc32c_checksum_ = crc32c_checksum_str ? gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue(); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> predefined_acl) { + predefined_acl_ = predefined_acl ? gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl(); + } + + void setContentType(std::optional<std::string> content_type_str) { + content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : gcs::ContentType(); + } + + void setIfGenerationMatch(std::optional<bool> overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr<gcs::ObjectMetadata> result_; +}; + +[[nodiscard]] std::optional<std::string> getContentType(const core::ProcessContext& context, const core::FlowFile& flow_file) { + return context.getProperty(PutGcsObject::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); +} + +void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const gcs::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(object_metadata.time_created().time_since_epoch().count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(object_metadata.updated().time_since_epoch().count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} +} // namespace + + +void PutGcsObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + ObjectName, + NumberOfRetries, + ContentType, + MD5HashLocation, + Crc32cChecksumLocation, + EncryptionKey, + ObjectACL, + OverwriteObject}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) { + retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), *encryption_key); Review comment: Shouldn't we throw an exception here instead? If the wrong input was due to a user error I don't think it will be realized if the logs are not specifically checked for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
