szaszm commented on code in PR #1297:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1297#discussion_r873957183
##########
PROCESSORS.md:
##########
@@ -390,6 +393,42 @@ In the list below, the names of required properties appear
in bold. Any other pr
|failure|If file deletion from Azure Data Lake storage fails the flowfile is
transferred to this relationship|
|success|If file deletion from Azure Data Lake storage succeeds the flowfile
is transferred to this relationship|
+## DeleteGCSObject
+
+### Description
+
+Deletes an object from a Google Cloud Bucket.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values
| Description
|
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------|
+| **Bucket** | ${gcs.bucket} |
| Bucket of the
object.<br>**Supports Expression Language: true**
|
+| **Key** | ${filename} |
| Name of the
object.<br>**Supports Expression Language: true**
|
+| **Number Of retries** | 6 | integers
| How many retry
attempts should be made before routing to the failure relationship.
|
+| **GCP Credentials Provider Service** | |
[GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService)
| The Controller Service used to obtain Google Cloud Platform credentials.
|
+| Server Side Encryption Key | |
| An AES256
Encryption Key (encoded in base64) for server-side encryption of the
object.<br>**Supports Expression Language: true** |
+| Generation | |
| The generation of
the Object to download. If null, will download latest generation.
|
+| Endpoint Override URL | |
| Overrides the
default Google Cloud Storage endpoints
|
Review Comment:
Consider calling this "Endpoint URL" and defaulting it to the official GCS.
I think that would improve usability by providing an example value, and it's
just more intuitive in general.
##########
extensions/gcp/processors/ListGCSBucket.cpp:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListGCSBucket.h"
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/OptionalUtils.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property ListGCSBucket::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property ListGCSBucket::UseVersions(
+ core::PropertyBuilder::createProperty("List all versions")
+ ->withDescription("Set this option to `true` to get all the previous
versions separately.")
+ ->withDefaultValue<bool>(false)
+ ->build());
+
+const core::Relationship ListGCSBucket::Success("success", "FlowFiles are
routed to this relationship after a successful Google Cloud Storage
operation.");
+
+void ListGCSBucket::initialize() {
+ setSupportedProperties({GCPCredentials,
+ Bucket,
+ NumberOfRetries,
+ EndpointOverrideURL,
+ UseVersions});
+ setSupportedRelationships({Success});
+}
+
+
+void ListGCSBucket::onSchedule(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ GCSProcessor::onSchedule(context, session_factory);
+ gsl_Expects(context);
+ context->getProperty(Bucket.getName(), bucket_);
+}
+
+void ListGCSBucket::onTrigger(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session && gcp_credentials_);
+
+ gcs::Client client = getClient();
+ auto use_generations = context->getProperty<bool>(UseVersions);
+ gcs::Versions versions = (use_generations && *use_generations) ?
gcs::Versions(true) : gcs::Versions(false);
+ auto objects_in_bucket = client.ListObjects(bucket_, versions);
+ for (auto& object_in_bucket : objects_in_bucket) {
+ if (object_in_bucket.ok()) {
Review Comment:
What if it's not OK? Can we log something?
##########
PROCESSORS.md:
##########
@@ -600,6 +639,43 @@ In the list below, the names of required properties appear
in bold. Any other pr
|success|Files that have been successfully fetched from Azure storage are
transferred to this relationship|
+## FetchGCSObject
+
+### Description
+
+Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with
ListGCSBucket.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values
| Description
|
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------|
+| **Bucket** | ${gcs.bucket} |
| Bucket of the
object.<br>**Supports Expression Language: true**
|
+| **Key** | ${filename} |
| Name of the
object.<br>**Supports Expression Language: true**
|
+| **Number Of retries** | 6 | integers
| How many retry
attempts should be made before routing to the failure relationship.
|
Review Comment:
"Of" should be lowercase in all processors for consistency with NiFi.
I also see other properties that are inconsistent with NiFi, because NiFi
processors themselves are naming their properties all over the place. I like
the consistent (with each other, not NiFi) naming.
##########
extensions/gcp/processors/FetchGCSObject.cpp:
##########
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchGCSObject.h"
+
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/OptionalUtils.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property FetchGCSObject::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->withDefaultValue("${gcs.bucket}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchGCSObject::Key(
+ core::PropertyBuilder::createProperty("Key")
+ ->withDescription("Name of the object.")
+ ->withDefaultValue("${filename}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchGCSObject::Generation(
+ core::PropertyBuilder::createProperty("Object Generation")
+ ->withDescription("The generation of the Object to download. If null,
will download latest generation.")
+ ->supportsExpressionLanguage(false)
+ ->build());
+
+const core::Property FetchGCSObject::EncryptionKey(
+ core::PropertyBuilder::createProperty("Server Side Encryption Key")
+ ->withDescription("The AES256 Encryption Key (encoded in base64) for
server-side decryption of the object.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship FetchGCSObject::Success("success", "FlowFiles are
routed to this relationship after a successful Google Cloud Storage
operation.");
+const core::Relationship FetchGCSObject::Failure("failure", "FlowFiles are
routed to this relationship if the Google Cloud Storage operation fails.");
+
+
+namespace {
+class FetchFromGCSCallback : public OutputStreamCallback {
Review Comment:
This shouldn't compile after my #1236. Could you rebase and adapt?
The simplest (and my recommended) way is probably to extract members into
local variables and the class into a lambda with captures.
The easiest is to remove the base class, rename process to operator(), and
use std::ref when passing it to session->write.
##########
extensions/gcp/processors/ListGCSBucket.cpp:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListGCSBucket.h"
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/OptionalUtils.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property ListGCSBucket::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property ListGCSBucket::UseVersions(
+ core::PropertyBuilder::createProperty("List all versions")
+ ->withDescription("Set this option to `true` to get all the previous
versions separately.")
+ ->withDefaultValue<bool>(false)
+ ->build());
+
+const core::Relationship ListGCSBucket::Success("success", "FlowFiles are
routed to this relationship after a successful Google Cloud Storage
operation.");
+
+void ListGCSBucket::initialize() {
+ setSupportedProperties({GCPCredentials,
+ Bucket,
+ NumberOfRetries,
+ EndpointOverrideURL,
+ UseVersions});
+ setSupportedRelationships({Success});
+}
+
+
+void ListGCSBucket::onSchedule(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ GCSProcessor::onSchedule(context, session_factory);
+ gsl_Expects(context);
+ context->getProperty(Bucket.getName(), bucket_);
+}
+
+void ListGCSBucket::onTrigger(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session && gcp_credentials_);
+
+ gcs::Client client = getClient();
+ auto use_generations = context->getProperty<bool>(UseVersions);
+ gcs::Versions versions = (use_generations && *use_generations) ?
gcs::Versions(true) : gcs::Versions(false);
Review Comment:
This should be consistent with the property name as well, if possible.
##########
extensions/gcp/processors/DeleteGCSObject.cpp:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteGCSObject.h"
+
+#include "core/Resource.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
+#include "utils/OptionalUtils.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property DeleteGCSObject::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->withDefaultValue("${gcs.bucket}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property DeleteGCSObject::Key(
+ core::PropertyBuilder::createProperty("Key")
+ ->withDescription("Name of the object.")
+ ->withDefaultValue("${filename}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property DeleteGCSObject::Generation(
+ core::PropertyBuilder::createProperty("Object Generation")
+ ->withDescription("The generation of the object to be deleted. If
null, will use latest version of the object.")
+ ->supportsExpressionLanguage(false)
+ ->build());
+
+const core::Property DeleteGCSObject::EncryptionKey(
+ core::PropertyBuilder::createProperty("Server Side Encryption Key")
+ ->withDescription("The AES256 Encryption Key (encoded in base64) for
server-side decryption of the object.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship DeleteGCSObject::Success("success", "FlowFiles are
routed to this relationship after a successful Google Cloud Storage
operation.");
+const core::Relationship DeleteGCSObject::Failure("failure", "FlowFiles are
routed to this relationship if the Google Cloud Storage operation fails.");
+
+void DeleteGCSObject::initialize() {
+ setSupportedProperties({GCPCredentials,
+ Bucket,
+ Key,
+ Generation,
+ NumberOfRetries,
+ EncryptionKey,
+ EndpointOverrideURL});
+ setSupportedRelationships({Success, Failure});
+}
+
+void DeleteGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ GCSProcessor::onSchedule(context, session_factory);
+}
Review Comment:
Why override with just a base call?
##########
extensions/gcp/processors/ListGCSBucket.cpp:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListGCSBucket.h"
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/OptionalUtils.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property ListGCSBucket::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property ListGCSBucket::UseVersions(
+ core::PropertyBuilder::createProperty("List all versions")
Review Comment:
I prefer naming the property object and the property the same.
("UseVersions" vs "List all versions")
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]