lordgamez commented on a change in pull request #1268:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1268#discussion_r811895379



##########
File path: extensions/gcp/tests/PutGcsObjectTests.cpp
##########
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/PutGcsObject.h"
+#include "../utils/GCPAttributes.h"
+#include "core/Resource.h"
+#include "SingleInputTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/retry_policy.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using PutGcsObject = org::apache::nifi::minifi::extensions::gcp::PutGcsObject;
+using GcpCredentialsControllerService = 
org::apache::nifi::minifi::extensions::gcp::GcpCredentialsControllerService;
+using ResumableUploadRequest = gcs::internal::ResumableUploadRequest;
+using ResumableUploadResponse = gcs::internal::ResumableUploadResponse;
+using ResumableUploadSession = gcs::internal::ResumableUploadSession;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class PutGcsObjectMocked : public PutGcsObject {
+  using org::apache::nifi::minifi::extensions::gcp::PutGcsObject::PutGcsObject;
+ public:
+  gcs::Client getClient(const gcs::ClientOptions&) const override {
+    return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+  }
+  std::shared_ptr<gcs::testing::MockClient> mock_client_ = 
std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(PutGcsObjectMocked, "PutGcsObjectMocked");
+}  // namespace
+
+class PutGcsObjectTests : public ::testing::Test {

Review comment:
       Maybe we can also add system tests as well just to test the mocked out 
parts. I found this [fake GCS 
server](https://github.com/fsouza/fake-gcs-server) which could be set up as a 
docker container, it could be worth a try.

##########
File path: cmake/GoogleCloudCpp.cmake
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+include(FetchContent)
+set(NLOHMANN_JSON_INCLUDE_DIR "${CMAKE_BINARY_DIR}/_deps/nlohmann/" CACHE 
STRING "" FORCE)

Review comment:
       LICENSE and NOTICE files should be updated with all the new dependecy 
libraries.

##########
File path: extensions/gcp/processors/PutGcsObject.cpp
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutGcsObject.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../utils/GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property PutGcsObject::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud 
Platform credentials.")
+        ->isRequired(true)
+        ->asType<GcpCredentialsControllerService>()
+        ->build());
+
+const core::Property PutGcsObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket Name")
+        ->withDescription("The name of the Bucket to upload to. If left empty 
the gcs.bucket attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectName(
+    core::PropertyBuilder::createProperty("Object Name")
+        ->withDescription("The name of the object to be uploaded. If left 
empty the filename attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before 
routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property PutGcsObject::ContentType(
+    core::PropertyBuilder::createProperty("Content Type")
+        ->withDescription("The Content Type of the uploaded object. If not 
set, \"mime.type\" flow file attribute will be used. "
+                          "In case of neither of them is specified, this 
information will not be sent to the server.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::MD5HashLocation(
+    core::PropertyBuilder::createProperty("MD5 Hash location")
+        ->withDescription("The name of the attribute where the md5 hash is 
stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::Crc32cChecksumLocation(
+    core::PropertyBuilder::createProperty("CRC32 Checksum location")
+        ->withDescription("The name of the attribute where the crc32 checksum 
is stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("An AES256 Encryption Key (encoded in base64) for 
server-side encryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectACL(
+    core::PropertyBuilder::createProperty("Object ACL")
+        ->withDescription("Access Control to be attached to the object 
uploaded. Not providing this will revert to bucket defaults.")
+        ->isRequired(false)
+        ->withAllowableValues(PredefinedAcl::values())
+        ->build());
+
+const core::Property PutGcsObject::OverwriteObject(
+    core::PropertyBuilder::createProperty("Overwrite Object")
+    ->withDescription("If false, the upload to GCS will succeed only if the 
object does not exist.")
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Relationship PutGcsObject::Success("success", "Files that have 
been successfully written to Google Cloud Storage are transferred to this 
relationship");
+const core::Relationship PutGcsObject::Failure("failure", "Files that could 
not be written to Google Cloud Storage for some reason are transferred to this 
relationship");
+
+
+namespace {
+class UploadToGCSCallback : public InputStreamCallback {
+ public:
+  UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = 
stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    auto writer = client_.WriteObject(bucket_, key_, hash_value_, 
crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, 
if_generation_match_);
+    writer << content;
+    writer.Close();
+    result_ = writer.metadata();
+    return read_ret;
+  }
+
+  [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& 
getResult() const noexcept {
+    return result_;
+  }
+
+  void setHashValue(std::optional<std::string> hash_value_str) {
+    hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : 
gcs::MD5HashValue();
+  }
+
+  void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) {
+    crc32c_checksum_ = crc32c_checksum_str ? 
gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue();
+  }
+
+  void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+    encryption_key_ = encryption_key;
+  }
+
+  void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> 
predefined_acl) {
+    predefined_acl_ = predefined_acl ? 
gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl();
+  }
+
+  void setContentType(std::optional<std::string> content_type_str) {
+    content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : 
gcs::ContentType();
+  }
+
+  void setIfGenerationMatch(std::optional<bool> overwrite) {
+    if (overwrite.has_value() && overwrite.value() == false) {
+      if_generation_match_ = gcs::IfGenerationMatch(0);
+    } else {
+      if_generation_match_ = gcs::IfGenerationMatch();
+    }
+  }
+
+ private:
+  std::string bucket_;
+  std::string key_;
+  gcs::Client& client_;
+
+  gcs::MD5HashValue hash_value_;
+  gcs::Crc32cChecksumValue crc32c_checksum_;
+  gcs::EncryptionKey encryption_key_;
+  gcs::PredefinedAcl predefined_acl_;
+  gcs::ContentType content_type_;
+  gcs::IfGenerationMatch if_generation_match_;
+
+  google::cloud::StatusOr<gcs::ObjectMetadata> result_;
+};
+
+[[nodiscard]] std::optional<std::string> getContentType(const 
core::ProcessContext& context, const core::FlowFile& flow_file) {
+  return context.getProperty(PutGcsObject::ContentType) | utils::orElse 
([&flow_file] {return flow_file.getAttribute("mime.type");});

Review comment:
       There is an unnecessary whitespace after `orElse`

##########
File path: extensions/gcp/utils/GCPAttributes.h
##########
@@ -0,0 +1,45 @@
+/**

Review comment:
       I think this file can be simply in the `gcp` folder, I don't think utils 
is a good place for this. It's also usually a code smell when we shove 
something in the utils package or folder because we don't know what to do with 
it :)

##########
File path: extensions/gcp/processors/PutGcsObject.h
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "../controllerservices/GcpCredentialsControllerService.h"
+#include "google/cloud/storage/client.h"
+#include "google/cloud/storage/retry_policy.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class PutGcsObject : public core::Processor {
+ public:
+  SMART_ENUM(PredefinedAcl,
+             (AUTHENTICATED_READ, "authenticatedRead"),
+             (BUCKET_OWNER_FULL_CONTROL, "bucketOwnerFullControl"),
+             (BUCKET_OWNER_READ_ONLY, "bucketOwnerRead"),
+             (PRIVATE, "private"),
+             (PROJECT_PRIVATE, "projectPrivate"),
+             (PUBLIC_READ_ONLY, "publicRead"),
+             (PUBLIC_READ_WRITE, "publicReadWrite"));
+
+  explicit PutGcsObject(const std::string& name, const utils::Identifier& uuid 
= {})
+      : core::Processor(name, uuid) {
+  }
+  PutGcsObject(const PutGcsObject&) = delete;
+  PutGcsObject(PutGcsObject&&) = delete;
+  PutGcsObject& operator=(const PutGcsObject&) = delete;
+  PutGcsObject& operator=(PutGcsObject&&) = delete;
+  ~PutGcsObject() override = default;
+
+  EXTENSIONAPI static const core::Property GCPCredentials;
+  EXTENSIONAPI static const core::Property Bucket;
+  EXTENSIONAPI static const core::Property ObjectName;
+
+  EXTENSIONAPI static const core::Property NumberOfRetries;
+
+  EXTENSIONAPI static const core::Property ContentType;
+  EXTENSIONAPI static const core::Property MD5HashLocation;
+  EXTENSIONAPI static const core::Property Crc32cChecksumLocation;
+  EXTENSIONAPI static const core::Property EncryptionKey;
+  EXTENSIONAPI static const core::Property ObjectACL;
+  EXTENSIONAPI static const core::Property OverwriteObject;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) override;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;

Review comment:
       It seems to me that every trigger creates a separate gcs client with its 
separate options, so in theory it could be multithreaded, or is there something 
else that's blocking it?

##########
File path: extensions/gcp/processors/PutGcsObject.cpp
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutGcsObject.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../utils/GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property PutGcsObject::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud 
Platform credentials.")
+        ->isRequired(true)
+        ->asType<GcpCredentialsControllerService>()
+        ->build());
+
+const core::Property PutGcsObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket Name")
+        ->withDescription("The name of the Bucket to upload to. If left empty 
the gcs.bucket attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectName(
+    core::PropertyBuilder::createProperty("Object Name")
+        ->withDescription("The name of the object to be uploaded. If left 
empty the filename attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before 
routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property PutGcsObject::ContentType(
+    core::PropertyBuilder::createProperty("Content Type")
+        ->withDescription("The Content Type of the uploaded object. If not 
set, \"mime.type\" flow file attribute will be used. "
+                          "In case of neither of them is specified, this 
information will not be sent to the server.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::MD5HashLocation(
+    core::PropertyBuilder::createProperty("MD5 Hash location")
+        ->withDescription("The name of the attribute where the md5 hash is 
stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::Crc32cChecksumLocation(
+    core::PropertyBuilder::createProperty("CRC32 Checksum location")
+        ->withDescription("The name of the attribute where the crc32 checksum 
is stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("An AES256 Encryption Key (encoded in base64) for 
server-side encryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectACL(
+    core::PropertyBuilder::createProperty("Object ACL")
+        ->withDescription("Access Control to be attached to the object 
uploaded. Not providing this will revert to bucket defaults.")
+        ->isRequired(false)
+        ->withAllowableValues(PredefinedAcl::values())
+        ->build());
+
+const core::Property PutGcsObject::OverwriteObject(
+    core::PropertyBuilder::createProperty("Overwrite Object")
+    ->withDescription("If false, the upload to GCS will succeed only if the 
object does not exist.")
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Relationship PutGcsObject::Success("success", "Files that have 
been successfully written to Google Cloud Storage are transferred to this 
relationship");
+const core::Relationship PutGcsObject::Failure("failure", "Files that could 
not be written to Google Cloud Storage for some reason are transferred to this 
relationship");
+
+
+namespace {
+class UploadToGCSCallback : public InputStreamCallback {
+ public:
+  UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = 
stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    auto writer = client_.WriteObject(bucket_, key_, hash_value_, 
crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, 
if_generation_match_);
+    writer << content;
+    writer.Close();
+    result_ = writer.metadata();
+    return read_ret;
+  }
+
+  [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& 
getResult() const noexcept {
+    return result_;
+  }
+
+  void setHashValue(std::optional<std::string> hash_value_str) {
+    hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : 
gcs::MD5HashValue();
+  }
+
+  void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) {
+    crc32c_checksum_ = crc32c_checksum_str ? 
gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue();
+  }
+
+  void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+    encryption_key_ = encryption_key;
+  }
+
+  void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> 
predefined_acl) {
+    predefined_acl_ = predefined_acl ? 
gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl();
+  }
+
+  void setContentType(std::optional<std::string> content_type_str) {
+    content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : 
gcs::ContentType();
+  }
+
+  void setIfGenerationMatch(std::optional<bool> overwrite) {
+    if (overwrite.has_value() && overwrite.value() == false) {
+      if_generation_match_ = gcs::IfGenerationMatch(0);
+    } else {
+      if_generation_match_ = gcs::IfGenerationMatch();
+    }
+  }
+
+ private:
+  std::string bucket_;
+  std::string key_;
+  gcs::Client& client_;
+
+  gcs::MD5HashValue hash_value_;
+  gcs::Crc32cChecksumValue crc32c_checksum_;
+  gcs::EncryptionKey encryption_key_;
+  gcs::PredefinedAcl predefined_acl_;
+  gcs::ContentType content_type_;
+  gcs::IfGenerationMatch if_generation_match_;
+
+  google::cloud::StatusOr<gcs::ObjectMetadata> result_;
+};
+
+[[nodiscard]] std::optional<std::string> getContentType(const 
core::ProcessContext& context, const core::FlowFile& flow_file) {
+  return context.getProperty(PutGcsObject::ContentType) | utils::orElse 
([&flow_file] {return flow_file.getAttribute("mime.type");});
+}
+
+void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const 
gcs::ObjectMetadata& object_metadata) {
+  flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket());
+  flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name());
+  flow_file.setAttribute(GCS_SIZE_ATTR, 
std::to_string(object_metadata.size()));
+  flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c());
+  flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash());
+  flow_file.setAttribute(GCS_CREATE_TIME_ATTR, 
std::to_string(object_metadata.time_created().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, 
std::to_string(object_metadata.updated().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
+  flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link());
+  flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag());
+  flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id());
+  flow_file.setAttribute(GCS_GENERATION, 
std::to_string(object_metadata.generation()));
+  if (object_metadata.has_customer_encryption()) {
+    flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, 
object_metadata.customer_encryption().encryption_algorithm);
+    flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, 
object_metadata.customer_encryption().key_sha256);
+  }
+  if (object_metadata.has_owner()) {
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, 
object_metadata.owner().entity);
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, 
object_metadata.owner().entity_id);
+  }
+}
+}  // namespace
+
+
+void PutGcsObject::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          ObjectName,
+                          NumberOfRetries,
+                          ContentType,
+                          MD5HashLocation,
+                          Crc32cChecksumLocation,
+                          EncryptionKey,
+                          ObjectACL,
+                          OverwriteObject});
+  setSupportedRelationships({Success, Failure});
+}
+
+gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const {
+  return gcs::Client(options, *retry_policy_);
+}
+
+
+void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  if (auto number_of_retries = 
context->getProperty<uint64_t>(NumberOfRetries)) {
+    retry_policy_ = 
std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
+  }
+  if (auto encryption_key = context->getProperty(EncryptionKey)) {
+    try {
+      encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
+    } catch (const google::cloud::RuntimeStatusError&) {
+      logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), 
*encryption_key);
+    }
+  }
+}
+
+void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+
+  auto gcp_credentials_controller_service = 
getGCPCredentialsControllerService(*context);
+  if (!gcp_credentials_controller_service) {
+    logger_->log_error("Invalid or missing Google Cloud Platform Credentials 
Controller Service");
+    context->yield();
+    return;
+  }
+
+  auto credentials = gcp_credentials_controller_service->getCredentials();
+  if (!credentials) {
+    logger_->log_error("Invalid or missing credentials from Google Cloud 
Platform Credentials Controller Service");
+    context->yield();
+    return;
+  }
+
+  auto ff = session->get();
+  if (!ff) {
+    context->yield();
+    return;
+  }
+
+  auto flow_file = gsl::not_null(std::move(ff));

Review comment:
       What's the reason for moving this variable from the previous one? Also 
is it worth checking for not null if we previously had a runtime check for it?

##########
File path: extensions/gcp/processors/PutGcsObject.cpp
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutGcsObject.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../utils/GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property PutGcsObject::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud 
Platform credentials.")
+        ->isRequired(true)
+        ->asType<GcpCredentialsControllerService>()
+        ->build());
+
+const core::Property PutGcsObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket Name")
+        ->withDescription("The name of the Bucket to upload to. If left empty 
the gcs.bucket attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectName(
+    core::PropertyBuilder::createProperty("Object Name")
+        ->withDescription("The name of the object to be uploaded. If left 
empty the filename attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before 
routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property PutGcsObject::ContentType(
+    core::PropertyBuilder::createProperty("Content Type")
+        ->withDescription("The Content Type of the uploaded object. If not 
set, \"mime.type\" flow file attribute will be used. "
+                          "In case of neither of them is specified, this 
information will not be sent to the server.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::MD5HashLocation(
+    core::PropertyBuilder::createProperty("MD5 Hash location")
+        ->withDescription("The name of the attribute where the md5 hash is 
stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::Crc32cChecksumLocation(
+    core::PropertyBuilder::createProperty("CRC32 Checksum location")
+        ->withDescription("The name of the attribute where the crc32 checksum 
is stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("An AES256 Encryption Key (encoded in base64) for 
server-side encryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectACL(
+    core::PropertyBuilder::createProperty("Object ACL")
+        ->withDescription("Access Control to be attached to the object 
uploaded. Not providing this will revert to bucket defaults.")
+        ->isRequired(false)
+        ->withAllowableValues(PredefinedAcl::values())
+        ->build());
+
+const core::Property PutGcsObject::OverwriteObject(
+    core::PropertyBuilder::createProperty("Overwrite Object")
+    ->withDescription("If false, the upload to GCS will succeed only if the 
object does not exist.")
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Relationship PutGcsObject::Success("success", "Files that have 
been successfully written to Google Cloud Storage are transferred to this 
relationship");
+const core::Relationship PutGcsObject::Failure("failure", "Files that could 
not be written to Google Cloud Storage for some reason are transferred to this 
relationship");
+
+
+namespace {
+class UploadToGCSCallback : public InputStreamCallback {
+ public:
+  UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = 
stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    auto writer = client_.WriteObject(bucket_, key_, hash_value_, 
crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, 
if_generation_match_);
+    writer << content;
+    writer.Close();
+    result_ = writer.metadata();
+    return read_ret;
+  }
+
+  [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& 
getResult() const noexcept {
+    return result_;
+  }
+
+  void setHashValue(std::optional<std::string> hash_value_str) {
+    hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : 
gcs::MD5HashValue();
+  }
+
+  void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) {
+    crc32c_checksum_ = crc32c_checksum_str ? 
gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue();
+  }

Review comment:
       This is more of a philosophical question, but do we want to change the 
member value if the optional value is empty? IMO if the parameter is optional, 
then we should only process it if it has a value and shouldn't change back to 
default value in case it's missing. Of course if we only use this once it 
doesn't make a difference. What do you think?

##########
File path: extensions/gcp/processors/PutGcsObject.h
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "../controllerservices/GcpCredentialsControllerService.h"
+#include "google/cloud/storage/client.h"
+#include "google/cloud/storage/retry_policy.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class PutGcsObject : public core::Processor {
+ public:
+  SMART_ENUM(PredefinedAcl,
+             (AUTHENTICATED_READ, "authenticatedRead"),
+             (BUCKET_OWNER_FULL_CONTROL, "bucketOwnerFullControl"),
+             (BUCKET_OWNER_READ_ONLY, "bucketOwnerRead"),
+             (PRIVATE, "private"),
+             (PROJECT_PRIVATE, "projectPrivate"),
+             (PUBLIC_READ_ONLY, "publicRead"),
+             (PUBLIC_READ_WRITE, "publicReadWrite"));
+
+  explicit PutGcsObject(const std::string& name, const utils::Identifier& uuid 
= {})
+      : core::Processor(name, uuid) {
+  }
+  PutGcsObject(const PutGcsObject&) = delete;
+  PutGcsObject(PutGcsObject&&) = delete;
+  PutGcsObject& operator=(const PutGcsObject&) = delete;
+  PutGcsObject& operator=(PutGcsObject&&) = delete;
+  ~PutGcsObject() override = default;
+
+  EXTENSIONAPI static const core::Property GCPCredentials;
+  EXTENSIONAPI static const core::Property Bucket;
+  EXTENSIONAPI static const core::Property ObjectName;
+
+  EXTENSIONAPI static const core::Property NumberOfRetries;
+

Review comment:
       Why is the `NumberOfRetries` separated from the other properties? It 
might feel lonely.

##########
File path: extensions/gcp/processors/PutGcsObject.cpp
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutGcsObject.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../utils/GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property PutGcsObject::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud 
Platform credentials.")
+        ->isRequired(true)
+        ->asType<GcpCredentialsControllerService>()
+        ->build());
+
+const core::Property PutGcsObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket Name")
+        ->withDescription("The name of the Bucket to upload to. If left empty 
the gcs.bucket attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectName(
+    core::PropertyBuilder::createProperty("Object Name")
+        ->withDescription("The name of the object to be uploaded. If left 
empty the filename attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before 
routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property PutGcsObject::ContentType(
+    core::PropertyBuilder::createProperty("Content Type")
+        ->withDescription("The Content Type of the uploaded object. If not 
set, \"mime.type\" flow file attribute will be used. "
+                          "In case of neither of them is specified, this 
information will not be sent to the server.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::MD5HashLocation(
+    core::PropertyBuilder::createProperty("MD5 Hash location")
+        ->withDescription("The name of the attribute where the md5 hash is 
stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::Crc32cChecksumLocation(
+    core::PropertyBuilder::createProperty("CRC32 Checksum location")
+        ->withDescription("The name of the attribute where the crc32 checksum 
is stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("An AES256 Encryption Key (encoded in base64) for 
server-side encryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectACL(
+    core::PropertyBuilder::createProperty("Object ACL")
+        ->withDescription("Access Control to be attached to the object 
uploaded. Not providing this will revert to bucket defaults.")
+        ->isRequired(false)
+        ->withAllowableValues(PredefinedAcl::values())
+        ->build());
+
+const core::Property PutGcsObject::OverwriteObject(
+    core::PropertyBuilder::createProperty("Overwrite Object")
+    ->withDescription("If false, the upload to GCS will succeed only if the 
object does not exist.")
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Relationship PutGcsObject::Success("success", "Files that have 
been successfully written to Google Cloud Storage are transferred to this 
relationship");
+const core::Relationship PutGcsObject::Failure("failure", "Files that could 
not be written to Google Cloud Storage for some reason are transferred to this 
relationship");
+
+
+namespace {
+class UploadToGCSCallback : public InputStreamCallback {
+ public:
+  UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = 
stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    auto writer = client_.WriteObject(bucket_, key_, hash_value_, 
crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, 
if_generation_match_);
+    writer << content;
+    writer.Close();
+    result_ = writer.metadata();
+    return read_ret;
+  }
+
+  [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& 
getResult() const noexcept {
+    return result_;
+  }
+
+  void setHashValue(std::optional<std::string> hash_value_str) {
+    hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : 
gcs::MD5HashValue();
+  }
+
+  void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) {
+    crc32c_checksum_ = crc32c_checksum_str ? 
gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue();
+  }
+
+  void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+    encryption_key_ = encryption_key;
+  }
+
+  void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> 
predefined_acl) {
+    predefined_acl_ = predefined_acl ? 
gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl();
+  }
+
+  void setContentType(std::optional<std::string> content_type_str) {
+    content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : 
gcs::ContentType();
+  }
+
+  void setIfGenerationMatch(std::optional<bool> overwrite) {
+    if (overwrite.has_value() && overwrite.value() == false) {
+      if_generation_match_ = gcs::IfGenerationMatch(0);
+    } else {
+      if_generation_match_ = gcs::IfGenerationMatch();
+    }
+  }
+
+ private:
+  std::string bucket_;
+  std::string key_;
+  gcs::Client& client_;
+
+  gcs::MD5HashValue hash_value_;
+  gcs::Crc32cChecksumValue crc32c_checksum_;
+  gcs::EncryptionKey encryption_key_;
+  gcs::PredefinedAcl predefined_acl_;
+  gcs::ContentType content_type_;
+  gcs::IfGenerationMatch if_generation_match_;
+
+  google::cloud::StatusOr<gcs::ObjectMetadata> result_;
+};
+
+[[nodiscard]] std::optional<std::string> getContentType(const 
core::ProcessContext& context, const core::FlowFile& flow_file) {
+  return context.getProperty(PutGcsObject::ContentType) | utils::orElse 
([&flow_file] {return flow_file.getAttribute("mime.type");});
+}
+
+void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const 
gcs::ObjectMetadata& object_metadata) {
+  flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket());
+  flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name());
+  flow_file.setAttribute(GCS_SIZE_ATTR, 
std::to_string(object_metadata.size()));
+  flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c());
+  flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash());
+  flow_file.setAttribute(GCS_CREATE_TIME_ATTR, 
std::to_string(object_metadata.time_created().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, 
std::to_string(object_metadata.updated().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
+  flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link());
+  flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag());
+  flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id());
+  flow_file.setAttribute(GCS_GENERATION, 
std::to_string(object_metadata.generation()));
+  if (object_metadata.has_customer_encryption()) {
+    flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, 
object_metadata.customer_encryption().encryption_algorithm);
+    flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, 
object_metadata.customer_encryption().key_sha256);
+  }
+  if (object_metadata.has_owner()) {
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, 
object_metadata.owner().entity);
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, 
object_metadata.owner().entity_id);
+  }
+}
+}  // namespace
+
+
+void PutGcsObject::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          ObjectName,
+                          NumberOfRetries,
+                          ContentType,
+                          MD5HashLocation,
+                          Crc32cChecksumLocation,
+                          EncryptionKey,
+                          ObjectACL,
+                          OverwriteObject});
+  setSupportedRelationships({Success, Failure});
+}
+
+gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const {
+  return gcs::Client(options, *retry_policy_);
+}
+
+
+void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  if (auto number_of_retries = 
context->getProperty<uint64_t>(NumberOfRetries)) {
+    retry_policy_ = 
std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
+  }
+  if (auto encryption_key = context->getProperty(EncryptionKey)) {
+    try {
+      encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
+    } catch (const google::cloud::RuntimeStatusError&) {
+      logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), 
*encryption_key);
+    }
+  }
+}
+
+void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+
+  auto gcp_credentials_controller_service = 
getGCPCredentialsControllerService(*context);
+  if (!gcp_credentials_controller_service) {
+    logger_->log_error("Invalid or missing Google Cloud Platform Credentials 
Controller Service");
+    context->yield();
+    return;
+  }
+
+  auto credentials = gcp_credentials_controller_service->getCredentials();
+  if (!credentials) {
+    logger_->log_error("Invalid or missing credentials from Google Cloud 
Platform Credentials Controller Service");
+    context->yield();
+    return;
+  }
+
+  auto ff = session->get();
+  if (!ff) {
+    context->yield();
+    return;
+  }
+
+  auto flow_file = gsl::not_null(std::move(ff));
+  auto bucket = context->getProperty(Bucket, flow_file) | 
utils::orElse([&flow_file] {return flow_file->getAttribute(GCS_BUCKET_ATTR);});
+  if (!bucket) {
+    logger_->log_error("Missing bucket name");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  auto object_name = context->getProperty(ObjectName, flow_file) | 
utils::orElse([&flow_file] {return 
flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME);});
+  if (!object_name) {
+    logger_->log_error("Missing object name");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  gcs::Client client = getClient(gcs::ClientOptions(credentials));
+  UploadToGCSCallback callback(client, *bucket, *object_name);
+  if (auto crc32_checksum_location = 
context->getProperty(Crc32cChecksumLocation, flow_file))
+    
callback.setCrc32CChecksumValue(flow_file->getAttribute(*crc32_checksum_location));
+  if (auto md5_hash_location = context->getProperty(MD5HashLocation, 
flow_file))
+    callback.setHashValue(flow_file->getAttribute(*md5_hash_location));
+  callback.setContentType(getContentType(*context, *flow_file));
+  callback.setPredefinedAcl(context->getProperty<PredefinedAcl>(ObjectACL));
+  callback.setIfGenerationMatch(context->getProperty<bool>(OverwriteObject));
+
+  callback.setEncryptionKey(encryption_key_);
+
+  session->read(flow_file, &callback);
+  auto& result = callback.getResult();
+  if (!result.ok()) {
+    flow_file->setAttribute(GCS_ERROR_REASON, 
result.status().error_info().reason());
+    flow_file->setAttribute(GCS_ERROR_DOMAIN, 
result.status().error_info().domain());
+    logger_->log_error("Failed to upload to Google Cloud Storage %s", 
result.status().error_info().reason());
+    session->transfer(flow_file, Failure);
+  } else {
+    setAttributesFromObjectMetadata(*flow_file, *result);

Review comment:
       Minor: I would add a debug log for the success scenario

##########
File path: extensions/gcp/processors/PutGcsObject.cpp
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutGcsObject.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../utils/GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property PutGcsObject::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud 
Platform credentials.")
+        ->isRequired(true)
+        ->asType<GcpCredentialsControllerService>()
+        ->build());
+
+const core::Property PutGcsObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket Name")
+        ->withDescription("The name of the Bucket to upload to. If left empty 
the gcs.bucket attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectName(
+    core::PropertyBuilder::createProperty("Object Name")
+        ->withDescription("The name of the object to be uploaded. If left 
empty the filename attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before 
routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property PutGcsObject::ContentType(
+    core::PropertyBuilder::createProperty("Content Type")
+        ->withDescription("The Content Type of the uploaded object. If not 
set, \"mime.type\" flow file attribute will be used. "
+                          "In case of neither of them is specified, this 
information will not be sent to the server.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::MD5HashLocation(
+    core::PropertyBuilder::createProperty("MD5 Hash location")
+        ->withDescription("The name of the attribute where the md5 hash is 
stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::Crc32cChecksumLocation(
+    core::PropertyBuilder::createProperty("CRC32 Checksum location")
+        ->withDescription("The name of the attribute where the crc32 checksum 
is stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("An AES256 Encryption Key (encoded in base64) for 
server-side encryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectACL(
+    core::PropertyBuilder::createProperty("Object ACL")
+        ->withDescription("Access Control to be attached to the object 
uploaded. Not providing this will revert to bucket defaults.")
+        ->isRequired(false)
+        ->withAllowableValues(PredefinedAcl::values())
+        ->build());
+
+const core::Property PutGcsObject::OverwriteObject(
+    core::PropertyBuilder::createProperty("Overwrite Object")
+    ->withDescription("If false, the upload to GCS will succeed only if the 
object does not exist.")
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Relationship PutGcsObject::Success("success", "Files that have 
been successfully written to Google Cloud Storage are transferred to this 
relationship");
+const core::Relationship PutGcsObject::Failure("failure", "Files that could 
not be written to Google Cloud Storage for some reason are transferred to this 
relationship");
+
+
+namespace {
+class UploadToGCSCallback : public InputStreamCallback {
+ public:
+  UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = 
stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    auto writer = client_.WriteObject(bucket_, key_, hash_value_, 
crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, 
if_generation_match_);
+    writer << content;
+    writer.Close();
+    result_ = writer.metadata();
+    return read_ret;
+  }
+
+  [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& 
getResult() const noexcept {
+    return result_;
+  }
+
+  void setHashValue(std::optional<std::string> hash_value_str) {
+    hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : 
gcs::MD5HashValue();
+  }
+
+  void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) {
+    crc32c_checksum_ = crc32c_checksum_str ? 
gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue();
+  }
+
+  void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+    encryption_key_ = encryption_key;
+  }
+
+  void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> 
predefined_acl) {
+    predefined_acl_ = predefined_acl ? 
gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl();
+  }
+
+  void setContentType(std::optional<std::string> content_type_str) {
+    content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : 
gcs::ContentType();
+  }
+
+  void setIfGenerationMatch(std::optional<bool> overwrite) {
+    if (overwrite.has_value() && overwrite.value() == false) {
+      if_generation_match_ = gcs::IfGenerationMatch(0);
+    } else {
+      if_generation_match_ = gcs::IfGenerationMatch();
+    }
+  }
+
+ private:
+  std::string bucket_;
+  std::string key_;
+  gcs::Client& client_;
+
+  gcs::MD5HashValue hash_value_;
+  gcs::Crc32cChecksumValue crc32c_checksum_;
+  gcs::EncryptionKey encryption_key_;
+  gcs::PredefinedAcl predefined_acl_;
+  gcs::ContentType content_type_;
+  gcs::IfGenerationMatch if_generation_match_;
+
+  google::cloud::StatusOr<gcs::ObjectMetadata> result_;
+};
+
+[[nodiscard]] std::optional<std::string> getContentType(const 
core::ProcessContext& context, const core::FlowFile& flow_file) {
+  return context.getProperty(PutGcsObject::ContentType) | utils::orElse 
([&flow_file] {return flow_file.getAttribute("mime.type");});
+}
+
+void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const 
gcs::ObjectMetadata& object_metadata) {
+  flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket());
+  flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name());
+  flow_file.setAttribute(GCS_SIZE_ATTR, 
std::to_string(object_metadata.size()));
+  flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c());
+  flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash());
+  flow_file.setAttribute(GCS_CREATE_TIME_ATTR, 
std::to_string(object_metadata.time_created().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, 
std::to_string(object_metadata.updated().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
+  flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link());
+  flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag());
+  flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id());
+  flow_file.setAttribute(GCS_GENERATION, 
std::to_string(object_metadata.generation()));
+  if (object_metadata.has_customer_encryption()) {
+    flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, 
object_metadata.customer_encryption().encryption_algorithm);
+    flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, 
object_metadata.customer_encryption().key_sha256);
+  }
+  if (object_metadata.has_owner()) {
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, 
object_metadata.owner().entity);
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, 
object_metadata.owner().entity_id);
+  }
+}
+}  // namespace
+
+
+void PutGcsObject::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          ObjectName,
+                          NumberOfRetries,
+                          ContentType,
+                          MD5HashLocation,
+                          Crc32cChecksumLocation,
+                          EncryptionKey,
+                          ObjectACL,
+                          OverwriteObject});
+  setSupportedRelationships({Success, Failure});
+}
+
+gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const {
+  return gcs::Client(options, *retry_policy_);
+}
+
+
+void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  if (auto number_of_retries = 
context->getProperty<uint64_t>(NumberOfRetries)) {
+    retry_policy_ = 
std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
+  }
+  if (auto encryption_key = context->getProperty(EncryptionKey)) {
+    try {
+      encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
+    } catch (const google::cloud::RuntimeStatusError&) {
+      logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), 
*encryption_key);
+    }
+  }
+}
+
+void PutGcsObject::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+
+  auto gcp_credentials_controller_service = 
getGCPCredentialsControllerService(*context);
+  if (!gcp_credentials_controller_service) {
+    logger_->log_error("Invalid or missing Google Cloud Platform Credentials 
Controller Service");
+    context->yield();
+    return;
+  }
+
+  auto credentials = gcp_credentials_controller_service->getCredentials();
+  if (!credentials) {
+    logger_->log_error("Invalid or missing credentials from Google Cloud 
Platform Credentials Controller Service");
+    context->yield();
+    return;
+  }

Review comment:
       Couldn't these be part of the onSchedule phase?

##########
File path: extensions/gcp/processors/PutGcsObject.cpp
##########
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutGcsObject.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../utils/GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property PutGcsObject::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud 
Platform credentials.")
+        ->isRequired(true)
+        ->asType<GcpCredentialsControllerService>()
+        ->build());
+
+const core::Property PutGcsObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket Name")
+        ->withDescription("The name of the Bucket to upload to. If left empty 
the gcs.bucket attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectName(
+    core::PropertyBuilder::createProperty("Object Name")
+        ->withDescription("The name of the object to be uploaded. If left 
empty the filename attribute will be used by default.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before 
routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property PutGcsObject::ContentType(
+    core::PropertyBuilder::createProperty("Content Type")
+        ->withDescription("The Content Type of the uploaded object. If not 
set, \"mime.type\" flow file attribute will be used. "
+                          "In case of neither of them is specified, this 
information will not be sent to the server.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::MD5HashLocation(
+    core::PropertyBuilder::createProperty("MD5 Hash location")
+        ->withDescription("The name of the attribute where the md5 hash is 
stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::Crc32cChecksumLocation(
+    core::PropertyBuilder::createProperty("CRC32 Checksum location")
+        ->withDescription("The name of the attribute where the crc32 checksum 
is stored for server-side validation.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("An AES256 Encryption Key (encoded in base64) for 
server-side encryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property PutGcsObject::ObjectACL(
+    core::PropertyBuilder::createProperty("Object ACL")
+        ->withDescription("Access Control to be attached to the object 
uploaded. Not providing this will revert to bucket defaults.")
+        ->isRequired(false)
+        ->withAllowableValues(PredefinedAcl::values())
+        ->build());
+
+const core::Property PutGcsObject::OverwriteObject(
+    core::PropertyBuilder::createProperty("Overwrite Object")
+    ->withDescription("If false, the upload to GCS will succeed only if the 
object does not exist.")
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Relationship PutGcsObject::Success("success", "Files that have 
been successfully written to Google Cloud Storage are transferred to this 
relationship");
+const core::Relationship PutGcsObject::Failure("failure", "Files that could 
not be written to Google Cloud Storage for some reason are transferred to this 
relationship");
+
+
+namespace {
+class UploadToGCSCallback : public InputStreamCallback {
+ public:
+  UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = 
stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    auto writer = client_.WriteObject(bucket_, key_, hash_value_, 
crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, 
if_generation_match_);
+    writer << content;
+    writer.Close();
+    result_ = writer.metadata();
+    return read_ret;
+  }
+
+  [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& 
getResult() const noexcept {
+    return result_;
+  }
+
+  void setHashValue(std::optional<std::string> hash_value_str) {
+    hash_value_ = hash_value_str ? gcs::MD5HashValue(*hash_value_str) : 
gcs::MD5HashValue();
+  }
+
+  void setCrc32CChecksumValue(std::optional<std::string> crc32c_checksum_str) {
+    crc32c_checksum_ = crc32c_checksum_str ? 
gcs::Crc32cChecksumValue(*crc32c_checksum_str) : gcs::Crc32cChecksumValue();
+  }
+
+  void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+    encryption_key_ = encryption_key;
+  }
+
+  void setPredefinedAcl(std::optional<PutGcsObject::PredefinedAcl> 
predefined_acl) {
+    predefined_acl_ = predefined_acl ? 
gcs::PredefinedAcl(predefined_acl->toString()) : gcs::PredefinedAcl();
+  }
+
+  void setContentType(std::optional<std::string> content_type_str) {
+    content_type_ = content_type_str ? gcs::ContentType(*content_type_str) : 
gcs::ContentType();
+  }
+
+  void setIfGenerationMatch(std::optional<bool> overwrite) {
+    if (overwrite.has_value() && overwrite.value() == false) {
+      if_generation_match_ = gcs::IfGenerationMatch(0);
+    } else {
+      if_generation_match_ = gcs::IfGenerationMatch();
+    }
+  }
+
+ private:
+  std::string bucket_;
+  std::string key_;
+  gcs::Client& client_;
+
+  gcs::MD5HashValue hash_value_;
+  gcs::Crc32cChecksumValue crc32c_checksum_;
+  gcs::EncryptionKey encryption_key_;
+  gcs::PredefinedAcl predefined_acl_;
+  gcs::ContentType content_type_;
+  gcs::IfGenerationMatch if_generation_match_;
+
+  google::cloud::StatusOr<gcs::ObjectMetadata> result_;
+};
+
+[[nodiscard]] std::optional<std::string> getContentType(const 
core::ProcessContext& context, const core::FlowFile& flow_file) {
+  return context.getProperty(PutGcsObject::ContentType) | utils::orElse 
([&flow_file] {return flow_file.getAttribute("mime.type");});
+}
+
+void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const 
gcs::ObjectMetadata& object_metadata) {
+  flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket());
+  flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name());
+  flow_file.setAttribute(GCS_SIZE_ATTR, 
std::to_string(object_metadata.size()));
+  flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c());
+  flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash());
+  flow_file.setAttribute(GCS_CREATE_TIME_ATTR, 
std::to_string(object_metadata.time_created().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, 
std::to_string(object_metadata.updated().time_since_epoch().count()));
+  flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
+  flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link());
+  flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag());
+  flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id());
+  flow_file.setAttribute(GCS_GENERATION, 
std::to_string(object_metadata.generation()));
+  if (object_metadata.has_customer_encryption()) {
+    flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, 
object_metadata.customer_encryption().encryption_algorithm);
+    flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, 
object_metadata.customer_encryption().key_sha256);
+  }
+  if (object_metadata.has_owner()) {
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, 
object_metadata.owner().entity);
+    flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, 
object_metadata.owner().entity_id);
+  }
+}
+}  // namespace
+
+
+void PutGcsObject::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          ObjectName,
+                          NumberOfRetries,
+                          ContentType,
+                          MD5HashLocation,
+                          Crc32cChecksumLocation,
+                          EncryptionKey,
+                          ObjectACL,
+                          OverwriteObject});
+  setSupportedRelationships({Success, Failure});
+}
+
+gcs::Client PutGcsObject::getClient(const gcs::ClientOptions& options) const {
+  return gcs::Client(options, *retry_policy_);
+}
+
+
+void PutGcsObject::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  if (auto number_of_retries = 
context->getProperty<uint64_t>(NumberOfRetries)) {
+    retry_policy_ = 
std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
+  }
+  if (auto encryption_key = context->getProperty(EncryptionKey)) {
+    try {
+      encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
+    } catch (const google::cloud::RuntimeStatusError&) {
+      logger_->log_error("%s is not in base64: %s", EncryptionKey.getName(), 
*encryption_key);

Review comment:
       Shouldn't we throw an exception here instead? If the wrong input was due 
to a user error I don't think it will be realized if the logs are not 
specifically checked for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to