This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 07781b8 MINIFICPP-1390 Create DeleteS3Object processor
07781b8 is described below
commit 07781b8f017e9d6f6de5e4857a91f04e5b9e88f8
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Jan 5 13:47:07 2021 +0100
MINIFICPP-1390 Create DeleteS3Object processor
Signed-off-by: Arpad Boda <[email protected]>
This closes #931
---
PROCESSORS.md | 36 ++-
README.md | 44 +--
docker/test/integration/minifi/__init__.py | 20 ++
docker/test/integration/minifi/test/__init__.py | 7 +-
.../{test_puts3object.py => test_s3.py} | 55 +++-
extensions/aws/AWSLoader.h | 4 +
extensions/aws/processors/DeleteS3Object.cpp | 88 ++++++
extensions/aws/processors/DeleteS3Object.h | 81 ++++++
extensions/aws/processors/PutS3Object.cpp | 231 +---------------
extensions/aws/processors/PutS3Object.h | 83 ++----
extensions/aws/processors/S3Processor.cpp | 262 ++++++++++++++++++
.../processors/{PutS3Object.h => S3Processor.h} | 111 ++------
extensions/aws/s3/S3Wrapper.cpp | 22 +-
extensions/aws/s3/S3Wrapper.h | 1 +
extensions/aws/s3/S3WrapperBase.cpp | 10 +
extensions/aws/s3/S3WrapperBase.h | 4 +
libminifi/test/aws-tests/DeleteS3ObjectTests.cpp | 133 +++++++++
libminifi/test/aws-tests/MockS3Wrapper.h | 80 ++++++
libminifi/test/aws-tests/PutS3ObjectTests.cpp | 301 ++++-----------------
libminifi/test/aws-tests/S3TestsFixture.h | 167 ++++++++++++
20 files changed, 1082 insertions(+), 658 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index f94b7d7..d9ea56e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -10,6 +10,7 @@
- [CaptureRTSPFrame](#capturertspframe)
- [CompressContent](#compresscontent)
- [ConsumeMQTT](#consumemqtt)
+- [DeleteS3Object](#deletes3object)
- [ExecuteProcess](#executeprocess)
- [ExecutePythonProcessor](#executepythonprocessor)
- [ExecuteSQL](#executesql)
@@ -208,6 +209,39 @@ In the list below, the names of required properties appear
in bold. Any other pr
|success|FlowFiles that are sent successfully to the destination are
transferred to this relationship|
+## DeleteS3Object
+
+### Description
+
+Deletes FlowFiles on an Amazon S3 Bucket. If attempting to delete a file that
does not exist, FlowFile is routed to success.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Object Key|||The key of the S3 object. If none is given the filename
attribute will be used by default.<br/>**Supports Expression Language: true**|
+|**Bucket**|||The S3 bucket<br/>**Supports Expression Language: true**|
+|Access Key|||AWS account access key<br/>**Supports Expression Language:
true**|
+|Secret Key|||AWS account secret key<br/>**Supports Expression Language:
true**|
+|Credentials File|||Path to a file containing AWS access key and secret key in
properties file format. Properties used: accessKey and secretKey|
+|AWS Credentials Provider service|||The name of the AWS Credentials Provider
controller service that is used to obtain AWS credentials.|
+|**Region**|us-west-2|af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-southeast-1<br/>ap-southeast-2<br/>ca-central-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-north-1<br/>eu-south-1<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>me-south-1<br/>sa-east-1<br/>us-east-1<br/>us-east-2<br/>us-gov-east-1<br/>us-gov-west-1<br/>us-west-1<br/>us-west-2|AWS
Region|
+|**Communications Timeout**|30 sec||Sets the timeout of the communication
between the AWS server and the client|
+|Endpoint Override URL|||Endpoint URL to use instead of the AWS default
including scheme, host, port, and path. The AWS libraries select an endpoint
URL based on the AWS region, but this property overrides the selected endpoint
URL, allowing use with other S3-compatible endpoints.<br/>**Supports Expression
Language: true**|
+|Proxy Host|||Proxy host name or IP<br/>**Supports Expression Language: true**|
+|Proxy Port|||The port number of the proxy host<br/>**Supports Expression
Language: true**|
+|Proxy Username|||Username to set when authenticating against
proxy<br/>**Supports Expression Language: true**|
+|Proxy Password|||Password to set when authenticating against
proxy<br/>**Supports Expression Language: true**|
+|Version|||The Version of the Object to delete<br/>**Supports Expression
Language: true**|
+### Relationships
+
+| Name | Description |
+| - | - |
+|failure|FlowFiles are routed to failure relationship|
+|success|FlowFiles are routed to success relationship|
+
+
## ExecuteProcess
### Description
@@ -964,7 +998,7 @@ In the list below, the names of required properties appear
in bold. Any other pr
|AWS Credentials Provider service|||The name of the AWS Credentials Provider
controller service that is used to obtain AWS credentials.|
|**Storage
Class**|Standard|Standard<br/>ReducedRedundancy<br/>StandardIA<br/>OnezoneIA<br/>IntelligentTiering<br/>Glacier<br/>DeepArchive|AWS
S3 Storage Class|
|**Region**|us-west-2|af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-southeast-1<br/>ap-southeast-2<br/>ca-central-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-north-1<br/>eu-south-1<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>me-south-1<br/>sa-east-1<br/>us-east-1<br/>us-east-2<br/>us-gov-east-1<br/>us-gov-west-1<br/>us-west-1<br/>us-west-2|AWS
Region|
-|**Communications Timeout**|30 sec|||
+|**Communications Timeout**|30 sec||Sets the timeout of the communication
between the AWS server and the client|
|FullControl User List|||A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have Full Control for an
object.<br/>**Supports Expression Language: true**|
|Read Permission User List|||A comma-separated list of Amazon User ID's or
E-mail addresses that specifies who should have Read Access for an
object.<br/>**Supports Expression Language: true**|
|Read ACL User List|||A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have permissions to read the Access Control
List for an object.<br/>**Supports Expression Language: true**|
diff --git a/README.md b/README.md
index 5d04b3f..28d39d1 100644
--- a/README.md
+++ b/README.md
@@ -59,21 +59,21 @@ A subset of the Apache NiFi [Expression
Language](EXPRESSIONS.md) is supported.
MiNiFi - C++ supports the following C++ processors:
-The following table lists the base set of processors.
+The following table lists the base set of processors.
| Extension Set | Processors |
| ------------- |:-------------|
| **Base** |
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](PROCESSORS.md#putfile)<br/>[RetryFlowFile](PROCESSOR
[...]
-The next table outlines CMAKE flags that correspond with MiNiFi extensions.
Extensions that are enabled by default ( such as CURL ), can be disabled with
the respective CMAKE flag on the command line.
+The next table outlines CMAKE flags that correspond with MiNiFi extensions.
Extensions that are enabled by default ( such as CURL ), can be disabled with
the respective CMAKE flag on the command line.
-Through JNI extensions you can run NiFi processors using NARs. The JNI
extension set allows you to run these Java processors. MiNiFi C++ will favor
C++ implementations over Java implements. In the case where a processor is
implemented in either language, the one in C++ will be selected; however, will
remain transparent to the consumer.
+Through JNI extensions you can run NiFi processors using NARs. The JNI
extension set allows you to run these Java processors. MiNiFi C++ will favor
C++ implementations over Java implements. In the case where a processor is
implemented in either language, the one in C++ will be selected; however, will
remain transparent to the consumer.
| Extension Set | Processors | CMAKE Flag |
| ------------- |:-------------| :-----|
| Archive Extensions |
[ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)
| -DBUILD_LIBARCHIVE=ON |
-| AWS | [AWSCredentialsService](CONTROLLERS.md#awsCredentialsService) |
-DENABLE_AWS=ON |
+| AWS |
[AWSCredentialsService](CONTROLLERS.md#awsCredentialsService)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)
| -DENABLE_AWS=ON |
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON |
| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON |
| GPS | GetGPS | -DENABLE_GPS=ON |
@@ -92,7 +92,7 @@ Through JNI extensions you can run NiFi processors using
NARs. The JNI extension
| USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera) |
-DENABLE_USB_CAMERA=ON |
| Windows Event Log (Windows only) |
CollectorInitiatedSubscription<br/>ConsumeWindowsEventLog<br/>TailEventLog |
-DENABLE_WEL=ON |
- Please see our [Python guide](extensions/script/README.md) on how to write
Python processors and use them within MiNiFi C++.
+ Please see our [Python guide](extensions/script/README.md) on how to write
Python processors and use them within MiNiFi C++.
## Caveats
* We follow semver with regards to API compatibility, but no ABI compatibility
is provided. See [semver's website](https://semver.org/) for more information
@@ -121,11 +121,11 @@ Through JNI extensions you can run NiFi processors using
NARs. The JNI extension
The following utilities are needed to build external projects, when bundled
versions of LibreSSL, cURL, or zlib are used:
-* patch
-* autoconf
-* automake
-* libtool
-
+* patch
+* autoconf
+* automake
+* libtool
+
**NOTE** if Lua support is enabled, then a C++ compiler with support for
c++-14 must be used. If using GCC, version 6.x
or greater is recommended.
@@ -133,7 +133,7 @@ or greater is recommended.
**NOTE** if Kafka support is enabled, a recent version of a compiler
supporting C++-11 regexes must be used. GCC versions >= 4.9.x are recommended.
-**NOTE** if Expression Language support is enabled, FlexLexer must be in the
include path and the version must be compatible with the version of flex used
when generating lexer sources. Lexer source generation is automatically
performed during CMake builds. To re-generate the sources, remove:
+**NOTE** if Expression Language support is enabled, FlexLexer must be in the
include path and the version must be compatible with the version of flex used
when generating lexer sources. Lexer source generation is automatically
performed during CMake builds. To re-generate the sources, remove:
* extensions/expression-language/Parser.cpp
* extensions/expression-language/Parser.hpp
@@ -194,7 +194,7 @@ On all distributions please use -DUSE_SHARED_LIBS=OFF to
statically link zlib, l
#### Windows
Build and Installation has been tested with Windows 10 using Visual Studio
2017. You can build
and create an MSI via the CPACK command. This requires the installation of
the WiX
- toolset (http://wixtoolset.org/). To do this, open up a prompt into your
build directory and
+ toolset (http://wixtoolset.org/). To do this, open up a prompt into your
build directory and
type 'cpack' . The CPACK command will automatically generate and provide you
a path to the distributable
msi file. See [Windows.md](Windows.md) for more details.
@@ -218,7 +218,7 @@ The needed dependencies can be installed with the following
commands for:
##### Yum based Linux Distributions
**NOTE** if a newer compiler is required, such as when Lua support is enabled,
it is recommended to use a newer compiler
-using a devtools-* package from the Software Collections (SCL).
+using a devtools-* package from the Software Collections (SCL).
```
# ~/Development/code/apache/nifi-minifi-cpp on git:master
@@ -373,11 +373,11 @@ $ # It is recommended that you install bison from source
as HomeBrew now uses an
* Extension cannot be installed due to
version of cmake or other software, or
incompatibility with other extensions
-
- Enter choice [ A - W or 1-4 ]
+
+ Enter choice [ A - W or 1-4 ]
```
-- Boostrap now saves state between runs. State will automatically be saved.
Provide -c or --clear to clear this state. The -i option provides a guided menu
install with the ability to change
+- Boostrap now saves state between runs. State will automatically be saved.
Provide -c or --clear to clear this state. The -i option provides a guided menu
install with the ability to change
advanced features.
### Building
@@ -392,7 +392,7 @@ advanced features.
- Perform a `cmake ..` to generate the project files
- Optionally disable or enable extensions. Please visit our guide
[extensions guide](Extensions.md) for flags or our wiki entry on
- [customizing
builds](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143)
for more information on this topic.
+ [customizing
builds](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143)
for more information on this topic.
```
# ~/Development/code/apache/nifi-minifi-cpp on git:master
$ cmake ..
@@ -513,7 +513,7 @@ from the project directory. Further instructions are
available in the [Snapcraft
### Configuring
The 'conf' directory in the root contains a template config.yml document,
minifi.properties, and minifi-log.properties. Please see our [Configuration
document](CONFIGURE.md) for details on how to configure agents.
-
+
### Running
After completing a [build](#building), the application can be run by issuing
the following from :
@@ -521,7 +521,7 @@ After completing a [build](#building), the application can
be run by issuing the
By default, this will make use of a config.yml located in the conf directory.
This configuration file location can be altered by adjusting the property
`nifi.flow.configuration.file` in minifi.properties located in the conf
directory.
-### Stopping
+### Stopping
MiNiFi can then be stopped by issuing:
@@ -537,12 +537,12 @@ MiNiFi can also be installed as a system service using
minifi.sh with an optiona
MiNiFi C++ comes with a deployment script. This will build and package minifi.
Additionally, a file named build_output will be
created within the build directory that contains a manifest of build artifacts.
- $ deploy.sh <build identifier>
+ $ deploy.sh <build identifier>
The build identifier will be carried with the deployed binary for the
configuration you specify. By default all extensions will be built.
-On Windows it is suggested that MSI be used for installation.
-
+On Windows it is suggested that MSI be used for installation.
+
### Extensions
Please see [Extensions.md](Extensions.md) on how to build and run
conditionally built dependencies and extensions.
diff --git a/docker/test/integration/minifi/__init__.py
b/docker/test/integration/minifi/__init__.py
index 18efddd..b64d541 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -608,6 +608,26 @@ class PutS3Object(Processor):
},
auto_terminate=['success'])
+class DeleteS3Object(Processor):
+ def __init__(self,
+ proxy_host='',
+ proxy_port='',
+ proxy_username='',
+ proxy_password=''):
+ super(DeleteS3Object, self).__init__('DeleteS3Object',
+ properties={
+ 'Object Key': 'test_object_key',
+ 'Bucket': 'test_bucket',
+ 'Access Key': 'test_access_key',
+ 'Secret Key': 'test_secret',
+ 'Endpoint Override URL':
"http://s3-server:9090",
+ 'Proxy Host': proxy_host,
+ 'Proxy Port': proxy_port,
+ 'Proxy Username': proxy_username,
+ 'Proxy Password': proxy_password,
+ },
+ auto_terminate=['success'])
+
class InputPort(Connectable):
def __init__(self, name=None, remote_process_group=None):
super(InputPort, self).__init__(name=name)
diff --git a/docker/test/integration/minifi/test/__init__.py
b/docker/test/integration/minifi/test/__init__.py
index 6dbef12..bf64472 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -205,7 +205,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
return url in output and \
((output.count("TCP_DENIED/407") != 0 and \
output.count("TCP_MISS/200") == output.count("TCP_DENIED/407"))
or \
- output.count("TCP_DENIED/407") == 0 and "TCP_MISS/200" in output)
+ output.count("TCP_DENIED/407") == 0 and "TCP_MISS" in output)
def check_s3_server_object_data(self):
s3_mock_dir = subprocess.check_output(["docker", "exec", "s3-server",
"find", "/tmp/", "-type", "d", "-name",
"s3mock*"]).decode(sys.stdout.encoding).strip()
@@ -218,6 +218,11 @@ class DockerTestCluster(SingleNodeDockerCluster):
server_metadata = json.loads(metadata_json)
return server_metadata["contentType"] == content_type and metadata ==
server_metadata["userMetadata"]
+ def is_s3_bucket_empty(self):
+ s3_mock_dir = subprocess.check_output(["docker", "exec", "s3-server",
"find", "/tmp/", "-type", "d", "-name",
"s3mock*"]).decode(sys.stdout.encoding).strip()
+ ls_result = subprocess.check_output(["docker", "exec", "s3-server",
"ls", s3_mock_dir + "/test_bucket/"]).decode(sys.stdout.encoding)
+ return not ls_result
+
def rm_out_child(self, dir):
logging.info('Removing %s from output folder',
os.path.join(self.tmp_test_output_dir, dir))
shutil.rmtree(os.path.join(self.tmp_test_output_dir, dir))
diff --git a/docker/test/integration/test_puts3object.py
b/docker/test/integration/test_s3.py
similarity index 54%
rename from docker/test/integration/test_puts3object.py
rename to docker/test/integration/test_s3.py
index 618fe7a..d961003 100644
--- a/docker/test/integration/test_puts3object.py
+++ b/docker/test/integration/test_s3.py
@@ -17,7 +17,7 @@ from minifi import *
from minifi.test import *
-def test_puts3object():
+def test_put_s3_object():
"""
Verify delivery of S3 object to AWS server
"""
@@ -35,7 +35,7 @@ def test_puts3object():
assert cluster.check_s3_server_object_data()
assert cluster.check_s3_server_object_metadata()
-def test_puts3object_proxy():
+def test_put_s3_object_proxy():
"""
Verify delivery of S3 object to AWS server through proxy server
"""
@@ -58,3 +58,54 @@ def test_puts3object_proxy():
assert
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key")
assert cluster.check_s3_server_object_data()
assert cluster.check_s3_server_object_metadata()
+
+def test_delete_s3_object():
+ """
+ Verify deletion of S3 object
+ """
+ flow = (GetFile('/tmp/input') >> PutS3Object() \
+ >> LogAttribute() \
+ >> DeleteS3Object() \
+ >> PutFile('/tmp/output/success'))
+
+ with DockerTestCluster(SingleFileOutputValidator('test',
subdir='success')) as cluster:
+ cluster.put_test_data('test_data')
+ cluster.deploy_flow(None, engine='s3-server')
+ cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
+ assert cluster.check_output(60)
+ assert cluster.is_s3_bucket_empty()
+
+def test_delete_s3_non_existing_object():
+ """
+ Verify deletion of a non-existing S3 object should succeed
+ """
+ flow = (GetFile('/tmp/input')
+ >> DeleteS3Object() \
+ >> PutFile('/tmp/output/success'))
+
+ with DockerTestCluster(SingleFileOutputValidator('test',
subdir='success')) as cluster:
+ cluster.put_test_data('test_data')
+ cluster.deploy_flow(None, engine='s3-server')
+ cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
+ assert cluster.check_output(60)
+
+def test_delete_s3_object_proxy():
+ """
+ Verify deletion of S3 object through proxy server
+ """
+ flow = (GetFile('/tmp/input') >> PutS3Object() \
+ >> LogAttribute() \
+ >> DeleteS3Object(proxy_host='http-proxy',
+ proxy_port='3128',
+ proxy_username='admin',
+ proxy_password='test101') \
+ >> PutFile('/tmp/output/success'))
+
+ with DockerTestCluster(SingleFileOutputValidator('test',
subdir='success')) as cluster:
+ cluster.put_test_data('test_data')
+ cluster.deploy_flow(None, engine='s3-server')
+ cluster.deploy_flow(None, engine='http-proxy')
+ cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
+ assert cluster.check_output(60)
+ assert cluster.is_s3_bucket_empty()
+ assert
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key")
diff --git a/extensions/aws/AWSLoader.h b/extensions/aws/AWSLoader.h
index ed0d488..eb0d684 100644
--- a/extensions/aws/AWSLoader.h
+++ b/extensions/aws/AWSLoader.h
@@ -26,6 +26,7 @@
#include "utils/StringUtils.h"
#include "controllerservices/AWSCredentialsService.h"
#include "processors/PutS3Object.h"
+#include "processors/DeleteS3Object.h"
class AWSObjectFactory : public core::ObjectFactory {
public:
@@ -51,6 +52,7 @@ class AWSObjectFactory : public core::ObjectFactory {
std::vector<std::string> class_names;
class_names.push_back("AWSCredentialsService");
class_names.push_back("PutS3Object");
+ class_names.push_back("DeleteS3Object");
return class_names;
}
@@ -59,6 +61,8 @@ class AWSObjectFactory : public core::ObjectFactory {
return std::unique_ptr<ObjectFactory>(new
core::DefautObjectFactory<minifi::aws::controllers::AWSCredentialsService>());
} else if (utils::StringUtils::equalsIgnoreCase(class_name,
"PutS3Object")) {
return std::unique_ptr<ObjectFactory>(new
core::DefautObjectFactory<minifi::aws::processors::PutS3Object>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name,
"DeleteS3Object")) {
+ return std::unique_ptr<ObjectFactory>(new
core::DefautObjectFactory<minifi::aws::processors::DeleteS3Object>());
} else {
return nullptr;
}
diff --git a/extensions/aws/processors/DeleteS3Object.cpp
b/extensions/aws/processors/DeleteS3Object.cpp
new file mode 100644
index 0000000..cbdab68
--- /dev/null
+++ b/extensions/aws/processors/DeleteS3Object.cpp
@@ -0,0 +1,88 @@
+/**
+ * @file DeleteS3Object.cpp
+ * DeleteS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteS3Object.h"
+
+#include <set>
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const core::Property DeleteS3Object::Version(
+ core::PropertyBuilder::createProperty("Version")
+ ->withDescription("The Version of the Object to delete")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship DeleteS3Object::Success("success", "FlowFiles are
routed to success relationship");
+const core::Relationship DeleteS3Object::Failure("failure", "FlowFiles are
routed to failure relationship");
+
+void DeleteS3Object::initialize() {
+ // Add new supported properties
+ updateSupportedProperties({Version});
+ // Set the supported relationships
+ setSupportedRelationships({Failure, Success});
+}
+
+bool DeleteS3Object::getExpressionLanguageSupportedProperties(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) {
+ if (!S3Processor::getExpressionLanguageSupportedProperties(context,
flow_file)) {
+ return false;
+ }
+
+ context->getProperty(Version, version_, flow_file);
+ logger_->log_debug("DeleteS3Object: Version [%s]", version_);
+ return true;
+}
+
+void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
+ logger_->log_debug("DeleteS3Object onTrigger");
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ if (!getExpressionLanguageSupportedProperties(context, flow_file)) {
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ if (s3_wrapper_->deleteObject(bucket_, object_key_, version_)) {
+ logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'",
object_key_, bucket_);
+ session->transfer(flow_file, Success);
+ } else {
+ logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'",
object_key_, bucket_);
+ session->transfer(flow_file, Failure);
+ }
+}
+
+} // namespace processors
+} // namespace aws
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/aws/processors/DeleteS3Object.h
b/extensions/aws/processors/DeleteS3Object.h
new file mode 100644
index 0000000..73cf507
--- /dev/null
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -0,0 +1,81 @@
+/**
+ * @file DeleteS3Object.h
+ * DeleteS3Object class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <sstream>
+#include <utility>
+#include <memory>
+#include <string>
+
+#include "S3Processor.h"
+#include "utils/GeneralUtils.h"
+
+template<typename T>
+class S3TestsFixture;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+class DeleteS3Object : public S3Processor {
+ public:
+ static constexpr char const* ProcessorName = "DeleteS3Object";
+
+ // Supported Properties
+ static const core::Property Version;
+
+ // Supported Relationships
+ static const core::Relationship Failure;
+ static const core::Relationship Success;
+
+ explicit DeleteS3Object(std::string name, minifi::utils::Identifier uuid =
minifi::utils::Identifier())
+ : S3Processor(std::move(name), uuid,
logging::LoggerFactory<DeleteS3Object>::getLogger()) {
+ }
+
+ ~DeleteS3Object() override = default;
+
+ void initialize() override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
+
+ protected:
+ bool getExpressionLanguageSupportedProperties(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file) override;
+
+ private:
+ friend class ::S3TestsFixture<DeleteS3Object>;
+
+ explicit DeleteS3Object(std::string name, minifi::utils::Identifier uuid,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+ : S3Processor(std::move(name), uuid,
logging::LoggerFactory<DeleteS3Object>::getLogger(), std::move(s3_wrapper)) {
+ }
+
+ std::string version_;
+};
+
+REGISTER_RESOURCE(DeleteS3Object, "This Processor deletes FlowFiles on an
Amazon S3 Bucket.");
+
+} // namespace processors
+} // namespace aws
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/aws/processors/PutS3Object.cpp
b/extensions/aws/processors/PutS3Object.cpp
index 369283d..b106549 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -23,7 +23,7 @@
#include <string>
#include <set>
#include <memory>
-#include <map>
+#include <utility>
#include "AWSCredentialsService.h"
#include "properties/Properties.h"
@@ -41,25 +41,9 @@ const uint64_t PutS3Object::ReadCallback::MAX_SIZE = 5UL *
1024UL * 1024UL * 102
const uint64_t PutS3Object::ReadCallback::BUFFER_SIZE = 4096;
const std::set<std::string>
PutS3Object::CANNED_ACLS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::CANNED_ACL_MAP));
-const std::set<std::string> PutS3Object::REGIONS({region::AF_SOUTH_1,
region::AP_EAST_1, region::AP_NORTHEAST_1,
- region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1,
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
- region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1,
region::EU_CENTRAL_1, region::EU_NORTH_1,
- region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3,
region::ME_SOUTH_1, region::SA_EAST_1,
- region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1,
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
const std::set<std::string>
PutS3Object::STORAGE_CLASSES(minifi::utils::MapUtils::getKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
const std::set<std::string>
PutS3Object::SERVER_SIDE_ENCRYPTIONS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
-const core::Property PutS3Object::ObjectKey(
- core::PropertyBuilder::createProperty("Object Key")
- ->withDescription("The key of the S3 object. If none is given the filename
attribute will be used by default.")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::Bucket(
- core::PropertyBuilder::createProperty("Bucket")
- ->withDescription("The S3 bucket")
- ->isRequired(true)
- ->supportsExpressionLanguage(true)
- ->build());
const core::Property PutS3Object::ContentType(
core::PropertyBuilder::createProperty("Content Type")
->withDescription("Sets the Content-Type HTTP header indicating the type
of content stored in "
@@ -69,24 +53,6 @@ const core::Property PutS3Object::ContentType(
->supportsExpressionLanguage(true)
->withDefaultValue<std::string>("application/octet-stream")
->build());
-const core::Property PutS3Object::AccessKey(
- core::PropertyBuilder::createProperty("Access Key")
- ->withDescription("AWS account access key")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::SecretKey(
- core::PropertyBuilder::createProperty("Secret Key")
- ->withDescription("AWS account secret key")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::CredentialsFile(
- core::PropertyBuilder::createProperty("Credentials File")
- ->withDescription("Path to a file containing AWS access key and secret key
in properties file format. Properties used: accessKey and secretKey")
- ->build());
-const core::Property PutS3Object::AWSCredentialsProviderService(
- core::PropertyBuilder::createProperty("AWS Credentials Provider service")
- ->withDescription("The name of the AWS Credentials Provider controller
service that is used to obtain AWS credentials.")
- ->build());
const core::Property PutS3Object::StorageClass(
core::PropertyBuilder::createProperty("Storage Class")
->isRequired(true)
@@ -94,19 +60,6 @@ const core::Property PutS3Object::StorageClass(
->withAllowableValues<std::string>(PutS3Object::STORAGE_CLASSES)
->withDescription("AWS S3 Storage Class")
->build());
-const core::Property PutS3Object::Region(
- core::PropertyBuilder::createProperty("Region")
- ->isRequired(true)
- ->withDefaultValue<std::string>(region::US_WEST_2)
- ->withAllowableValues<std::string>(PutS3Object::REGIONS)
- ->withDescription("AWS Region")
- ->build());
-const core::Property PutS3Object::CommunicationsTimeout(
- core::PropertyBuilder::createProperty("Communications Timeout")
- ->isRequired(true)
- ->withDefaultValue<core::TimePeriodValue>("30 sec")
- ->withDescription("")
- ->build());
const core::Property PutS3Object::FullControlUserList(
core::PropertyBuilder::createProperty("FullControl User List")
->withDescription("A comma-separated list of Amazon User ID's or E-mail
addresses that specifies who should have Full Control for an object.")
@@ -135,14 +88,6 @@ const core::Property PutS3Object::CannedACL(
"PublicReadWrite, PublicRead, Private, AwsExecRead; will
be ignored if any other ACL/permission property is specified.")
->supportsExpressionLanguage(true)
->build());
-const core::Property PutS3Object::EndpointOverrideURL(
- core::PropertyBuilder::createProperty("Endpoint Override URL")
- ->withDescription("Endpoint URL to use instead of the AWS default
including scheme, host, "
- "port, and path. The AWS libraries select an endpoint
URL based on the AWS "
- "region, but this property overrides the selected
endpoint URL, allowing use "
- "with other S3-compatible endpoints.")
- ->supportsExpressionLanguage(true)
- ->build());
const core::Property PutS3Object::ServerSideEncryption(
core::PropertyBuilder::createProperty("Server Side Encryption")
->isRequired(true)
@@ -150,106 +95,16 @@ const core::Property PutS3Object::ServerSideEncryption(
->withAllowableValues<std::string>(PutS3Object::SERVER_SIDE_ENCRYPTIONS)
->withDescription("Specifies the algorithm used for server side
encryption.")
->build());
-const core::Property PutS3Object::ProxyHost(
- core::PropertyBuilder::createProperty("Proxy Host")
- ->withDescription("Proxy host name or IP")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::ProxyPort(
- core::PropertyBuilder::createProperty("Proxy Port")
- ->withDescription("The port number of the proxy host")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::ProxyUsername(
- core::PropertyBuilder::createProperty("Proxy Username")
- ->withDescription("Username to set when authenticating against proxy")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::ProxyPassword(
- core::PropertyBuilder::createProperty("Proxy Password")
- ->withDescription("Password to set when authenticating against proxy")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutS3Object::UseDefaultCredentials(
- core::PropertyBuilder::createProperty("Use Default Credentials")
- ->withDescription("If true, uses the Default Credential chain, including
EC2 instance profiles or roles, environment variables, default user
credentials, etc.")
- ->withDefaultValue<bool>(false)
- ->isRequired(true)
- ->build());
const core::Relationship PutS3Object::Success("success", "FlowFiles are routed
to success relationship");
const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed
to failure relationship");
void PutS3Object::initialize() {
- // Set the supported properties
- std::set<core::Property> properties;
- properties.insert(ObjectKey);
- properties.insert(Bucket);
- properties.insert(ContentType);
- properties.insert(AccessKey);
- properties.insert(SecretKey);
- properties.insert(CredentialsFile);
- properties.insert(AWSCredentialsProviderService);
- properties.insert(StorageClass);
- properties.insert(Region);
- properties.insert(CommunicationsTimeout);
- properties.insert(FullControlUserList);
- properties.insert(ReadPermissionUserList);
- properties.insert(ReadACLUserList);
- properties.insert(WriteACLUserList);
- properties.insert(CannedACL);
- properties.insert(EndpointOverrideURL);
- properties.insert(ServerSideEncryption);
- properties.insert(ProxyHost);
- properties.insert(ProxyPort);
- properties.insert(ProxyUsername);
- properties.insert(ProxyPassword);
- properties.insert(UseDefaultCredentials);
- setSupportedProperties(properties);
+ // Add new supported properties
+ updateSupportedProperties({ContentType, StorageClass, FullControlUserList,
ReadPermissionUserList,
+ ReadACLUserList, WriteACLUserList, CannedACL, ServerSideEncryption});
// Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Failure);
- relationships.insert(Success);
- setSupportedRelationships(relationships);
-}
-
-minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const {
- std::string service_name;
- if (context->getProperty(AWSCredentialsProviderService.getName(),
service_name) && !service_name.empty()) {
- std::shared_ptr<core::controller::ControllerService> service =
context->getControllerService(service_name);
- if (nullptr != service) {
- auto aws_credentials_service =
std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
- if (aws_credentials_service) {
- return
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
- }
- }
- }
- return minifi::utils::nullopt;
-}
-
-minifi::utils::optional<Aws::Auth::AWSCredentials>
PutS3Object::getAWSCredentials(
- const std::shared_ptr<core::ProcessContext> &context,
- const std::shared_ptr<core::FlowFile> &flow_file) {
- auto service_cred = getAWSCredentialsFromControllerService(context);
- if (service_cred) {
- logger_->log_info("AWS Credentials successfully set from controller
service");
- return service_cred.value();
- }
-
- std::string access_key;
- context->getProperty(AccessKey, access_key, flow_file);
- aws_credentials_provider_.setAccessKey(access_key);
- std::string secret_key;
- context->getProperty(SecretKey, secret_key, flow_file);
- aws_credentials_provider_.setSecretKey(secret_key);
- std::string credential_file;
- context->getProperty(CredentialsFile.getName(), credential_file);
- aws_credentials_provider_.setCredentialsFile(credential_file);
- bool use_default_credentials = false;
- context->getProperty(UseDefaultCredentials.getName(),
use_default_credentials);
- aws_credentials_provider_.setUseDefaultCredentials(use_default_credentials);
-
- return aws_credentials_provider_.getAWSCredentials();
+ setSupportedRelationships({Failure, Success});
}
void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext>
&context) {
@@ -271,28 +126,8 @@ void PutS3Object::fillUserMetadata(const
std::shared_ptr<core::ProcessContext> &
logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
}
-bool PutS3Object::setProxy(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
- aws::s3::ProxyOptions proxy;
- context->getProperty(ProxyHost, proxy.host, flow_file);
- std::string port_str;
- if (context->getProperty(ProxyPort, port_str, flow_file) &&
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
- logger_->log_error("Proxy port invalid");
- return false;
- }
- context->getProperty(ProxyUsername, proxy.username, flow_file);
- context->getProperty(ProxyPassword, proxy.password, flow_file);
- if (!proxy.host.empty()) {
- s3_wrapper_->setProxy(proxy);
- logger_->log_info("Proxy for PutS3Object was set.");
- }
- return true;
-}
-
void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
- if (!context->getProperty(Bucket.getName(), put_s3_request_params_.bucket)
|| put_s3_request_params_.bucket.empty()) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or
invalid");
- }
- logger_->log_debug("PutS3Object: Bucket [%s]",
put_s3_request_params_.bucket);
+ S3Processor::onSchedule(context, sessionFactory);
if (!context->getProperty(StorageClass.getName(),
put_s3_request_params_.storage_class)
|| put_s3_request_params_.storage_class.empty()
@@ -301,21 +136,6 @@ void PutS3Object::onSchedule(const
std::shared_ptr<core::ProcessContext> &contex
}
logger_->log_debug("PutS3Object: Storage Class [%s]",
put_s3_request_params_.storage_class);
- std::string value;
- if (!context->getProperty(Region.getName(), value) || value.empty() ||
REGIONS.count(value) == 0) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or
invalid");
- }
- s3_wrapper_->setRegion(value);
- logger_->log_debug("PutS3Object: Region [%s]", value);
-
- uint64_t timeout_val;
- if (context->getProperty(CommunicationsTimeout.getName(), value) &&
!value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
- s3_wrapper_->setTimeout(timeout_val);
- logger_->log_debug("PutS3Object: Communications Timeout [%d]",
timeout_val);
- } else {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout
missing or invalid");
- }
-
if (!context->getProperty(ServerSideEncryption.getName(),
put_s3_request_params_.server_side_encryption)
|| put_s3_request_params_.server_side_encryption.empty()
||
SERVER_SIDE_ENCRYPTIONS.find(put_s3_request_params_.server_side_encryption) ==
SERVER_SIDE_ENCRYPTIONS.end()) {
@@ -374,41 +194,15 @@ bool PutS3Object::setAccessControl(const
std::shared_ptr<core::ProcessContext> &
bool PutS3Object::getExpressionLanguageSupportedProperties(
const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file) {
- context->getProperty(ObjectKey, put_s3_request_params_.object_key,
flow_file);
- if (put_s3_request_params_.object_key.empty()) {
- if (!flow_file->getAttribute("filename",
put_s3_request_params_.object_key) ||
put_s3_request_params_.object_key.empty()) {
- logger_->log_error("No Object Key is set and default object key
'filename' attribute could not be found!");
- return false;
- }
- }
- logger_->log_debug("PutS3Object: Object Key [%s]",
put_s3_request_params_.object_key);
- if (!context->getProperty(Bucket, put_s3_request_params_.bucket, flow_file)
|| put_s3_request_params_.bucket.empty()) {
- logger_->log_error("Bucket is invalid or empty!",
put_s3_request_params_.bucket);
+ if (!S3Processor::getExpressionLanguageSupportedProperties(context,
flow_file)) {
return false;
}
- logger_->log_debug("PutS3Object: Bucket [%s]",
put_s3_request_params_.bucket);
+ put_s3_request_params_.object_key = std::move(object_key_);
+ put_s3_request_params_.bucket = std::move(bucket_);
context->getProperty(ContentType, put_s3_request_params_.content_type,
flow_file);
logger_->log_debug("PutS3Object: Content Type [%s]",
put_s3_request_params_.content_type);
- auto credentials = getAWSCredentials(context, flow_file);
- if (!credentials) {
- logger_->log_error("AWS Credentials have not been set!");
- return false;
- }
- s3_wrapper_->setCredentials(credentials.value());
-
- if (!setProxy(context, flow_file)) {
- context->yield();
- return false;
- }
-
- std::string value;
- if (context->getProperty(EndpointOverrideURL, value, flow_file) &&
!value.empty()) {
- s3_wrapper_->setEndpointOverrideUrl(value);
- logger_->log_debug("PutS3Object: Endpoint Override URL [%d]", value);
- }
-
return setAccessControl(context, flow_file);
}
@@ -441,22 +235,23 @@ void PutS3Object::onTrigger(const
std::shared_ptr<core::ProcessContext> &context
logger_->log_debug("PutS3Object onTrigger");
std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
+ context->yield();
return;
}
if (!getExpressionLanguageSupportedProperties(context, flow_file)) {
- context->yield();
+ session->transfer(flow_file, Failure);
return;
}
PutS3Object::ReadCallback callback(flow_file->getSize(),
put_s3_request_params_, s3_wrapper_.get());
session->read(flow_file, &callback);
if (callback.result_ == minifi::utils::nullopt) {
- logger_->log_error("Failed to upload S3 object to bucket %s",
put_s3_request_params_.bucket);
+ logger_->log_error("Failed to upload S3 object to bucket '%s'",
put_s3_request_params_.bucket);
session->transfer(flow_file, Failure);
} else {
setAttributes(session, flow_file, callback.result_.value());
- logger_->log_debug("Successfully uploaded S3 object %s to bucket %s",
put_s3_request_params_.object_key, put_s3_request_params_.bucket);
+ logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'",
put_s3_request_params_.object_key, put_s3_request_params_.bucket);
session->transfer(flow_file, Success);
}
}
diff --git a/extensions/aws/processors/PutS3Object.h
b/extensions/aws/processors/PutS3Object.h
index 5c29b11..2bb340a 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -28,13 +28,11 @@
#include <set>
#include <algorithm>
-#include "S3Wrapper.h"
-#include "core/Property.h"
-#include "core/Processor.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/OptionalUtils.h"
-#include "AWSCredentialsProvider.h"
+#include "S3Processor.h"
+#include "utils/GeneralUtils.h"
+
+template<typename T>
+class S3TestsFixture;
namespace org {
namespace apache {
@@ -43,85 +41,34 @@ namespace minifi {
namespace aws {
namespace processors {
-namespace region {
-
-constexpr const char *AF_SOUTH_1 = "af-south-1";
-constexpr const char *AP_EAST_1 = "ap-east-1";
-constexpr const char *AP_NORTHEAST_1 = "ap-northeast-1";
-constexpr const char *AP_NORTHEAST_2 = "ap-northeast-2";
-constexpr const char *AP_NORTHEAST_3 = "ap-northeast-3";
-constexpr const char *AP_SOUTH_1 = "ap-south-1";
-constexpr const char *AP_SOUTHEAST_1 = "ap-southeast-1";
-constexpr const char *AP_SOUTHEAST_2 = "ap-southeast-2";
-constexpr const char *CA_CENTRAL_1 = "ca-central-1";
-constexpr const char *CN_NORTH_1 = "cn-north-1";
-constexpr const char *CN_NORTHWEST_1 = "cn-northwest-1";
-constexpr const char *EU_CENTRAL_1 = "eu-central-1";
-constexpr const char *EU_NORTH_1 = "eu-north-1";
-constexpr const char *EU_SOUTH_1 = "eu-south-1";
-constexpr const char *EU_WEST_1 = "eu-west-1";
-constexpr const char *EU_WEST_2 = "eu-west-2";
-constexpr const char *EU_WEST_3 = "eu-west-3";
-constexpr const char *ME_SOUTH_1 = "me-south-1";
-constexpr const char *SA_EAST_1 = "sa-east-1";
-constexpr const char *US_EAST_1 = "us-east-1";
-constexpr const char *US_EAST_2 = "us-east-2";
-constexpr const char *US_GOV_EAST_1 = "us-gov-east-1";
-constexpr const char *US_GOV_WEST_1 = "us-gov-west-1";
-constexpr const char *US_WEST_1 = "us-west-1";
-constexpr const char *US_WEST_2 = "us-west-2";
-
-} // namespace region
-
-class PutS3Object : public core::Processor {
+class PutS3Object : public S3Processor {
public:
static constexpr char const* ProcessorName = "PutS3Object";
static const std::set<std::string> CANNED_ACLS;
- static const std::set<std::string> REGIONS;
static const std::set<std::string> STORAGE_CLASSES;
static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
// Supported Properties
- static const core::Property ObjectKey;
- static const core::Property Bucket;
static const core::Property ContentType;
- static const core::Property AccessKey;
- static const core::Property SecretKey;
- static const core::Property CredentialsFile;
- static const core::Property AWSCredentialsProviderService;
static const core::Property StorageClass;
static const core::Property ServerSideEncryption;
- static const core::Property Region;
- static const core::Property CommunicationsTimeout;
static const core::Property FullControlUserList;
static const core::Property ReadPermissionUserList;
static const core::Property ReadACLUserList;
static const core::Property WriteACLUserList;
static const core::Property CannedACL;
- static const core::Property EndpointOverrideURL;
- static const core::Property ProxyHost;
- static const core::Property ProxyPort;
- static const core::Property ProxyUsername;
- static const core::Property ProxyPassword;
- static const core::Property UseDefaultCredentials;
// Supported Relationships
static const core::Relationship Failure;
static const core::Relationship Success;
explicit PutS3Object(std::string name, minifi::utils::Identifier uuid =
minifi::utils::Identifier())
- : PutS3Object(name, uuid,
minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
- }
-
- explicit PutS3Object(std::string name, minifi::utils::Identifier uuid,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
- : core::Processor(std::move(name), uuid)
- , s3_wrapper_(std::move(s3_wrapper)) {
+ : S3Processor(std::move(name), uuid,
logging::LoggerFactory<PutS3Object>::getLogger()) {
}
~PutS3Object() override = default;
- bool supportsDynamicProperties() override { return true; }
void initialize() override;
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
@@ -169,22 +116,24 @@ class PutS3Object : public core::Processor {
minifi::utils::optional<minifi::aws::s3::PutObjectResult> result_ =
minifi::utils::nullopt;
};
+ protected:
+ bool getExpressionLanguageSupportedProperties(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file) override;
+
private:
- minifi::utils::optional<Aws::Auth::AWSCredentials>
getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const;
- minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
+ friend class ::S3TestsFixture<PutS3Object>;
+
+ explicit PutS3Object(std::string name, minifi::utils::Identifier uuid,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+ : S3Processor(std::move(name), uuid,
logging::LoggerFactory<PutS3Object>::getLogger(), std::move(s3_wrapper)) {
+ }
+
void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
- bool setProxy(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
- bool getExpressionLanguageSupportedProperties(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
std::string parseAccessControlList(const std::string &comma_separated_list)
const;
bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file);
bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file);
void setAttributes(const std::shared_ptr<core::ProcessSession> &session,
const std::shared_ptr<core::FlowFile> &flow_file, const
minifi::aws::s3::PutObjectResult &put_object_result);
- std::shared_ptr<logging::Logger>
logger_{logging::LoggerFactory<PutS3Object>::getLogger()};
aws::s3::PutObjectRequestParameters put_s3_request_params_;
- std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper_;
std::string user_metadata_;
- aws::AWSCredentialsProvider aws_credentials_provider_;
};
REGISTER_RESOURCE(PutS3Object, "This Processor puts FlowFiles to an Amazon S3
Bucket.");
diff --git a/extensions/aws/processors/S3Processor.cpp
b/extensions/aws/processors/S3Processor.cpp
new file mode 100644
index 0000000..1a02b7f
--- /dev/null
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -0,0 +1,262 @@
+/**
+ * @file S3Processor.cpp
+ * Base S3 processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "S3Processor.h"
+
+#include <string>
+#include <set>
+#include <memory>
+
+#include "S3Wrapper.h"
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::set<std::string> S3Processor::REGIONS({region::AF_SOUTH_1,
region::AP_EAST_1, region::AP_NORTHEAST_1,
+ region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1,
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
+ region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1,
region::EU_CENTRAL_1, region::EU_NORTH_1,
+ region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3,
region::ME_SOUTH_1, region::SA_EAST_1,
+ region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1,
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
+
+const core::Property S3Processor::ObjectKey(
+ core::PropertyBuilder::createProperty("Object Key")
+ ->withDescription("The key of the S3 object. If none is given the filename
attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("The S3 bucket")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::AccessKey(
+ core::PropertyBuilder::createProperty("Access Key")
+ ->withDescription("AWS account access key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::SecretKey(
+ core::PropertyBuilder::createProperty("Secret Key")
+ ->withDescription("AWS account secret key")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::CredentialsFile(
+ core::PropertyBuilder::createProperty("Credentials File")
+ ->withDescription("Path to a file containing AWS access key and secret key
in properties file format. Properties used: accessKey and secretKey")
+ ->build());
+const core::Property S3Processor::AWSCredentialsProviderService(
+ core::PropertyBuilder::createProperty("AWS Credentials Provider service")
+ ->withDescription("The name of the AWS Credentials Provider controller
service that is used to obtain AWS credentials.")
+ ->build());
+const core::Property S3Processor::Region(
+ core::PropertyBuilder::createProperty("Region")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(region::US_WEST_2)
+ ->withAllowableValues<std::string>(S3Processor::REGIONS)
+ ->withDescription("AWS Region")
+ ->build());
+const core::Property S3Processor::CommunicationsTimeout(
+ core::PropertyBuilder::createProperty("Communications Timeout")
+ ->isRequired(true)
+ ->withDefaultValue<core::TimePeriodValue>("30 sec")
+ ->withDescription("Sets the timeout of the communication between the AWS
server and the client")
+ ->build());
+const core::Property S3Processor::EndpointOverrideURL(
+ core::PropertyBuilder::createProperty("Endpoint Override URL")
+ ->withDescription("Endpoint URL to use instead of the AWS default
including scheme, host, "
+ "port, and path. The AWS libraries select an endpoint
URL based on the AWS "
+ "region, but this property overrides the selected
endpoint URL, allowing use "
+ "with other S3-compatible endpoints.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::ProxyHost(
+ core::PropertyBuilder::createProperty("Proxy Host")
+ ->withDescription("Proxy host name or IP")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::ProxyPort(
+ core::PropertyBuilder::createProperty("Proxy Port")
+ ->withDescription("The port number of the proxy host")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::ProxyUsername(
+ core::PropertyBuilder::createProperty("Proxy Username")
+ ->withDescription("Username to set when authenticating against proxy")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::ProxyPassword(
+ core::PropertyBuilder::createProperty("Proxy Password")
+ ->withDescription("Password to set when authenticating against proxy")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property S3Processor::UseDefaultCredentials(
+ core::PropertyBuilder::createProperty("Use Default Credentials")
+ ->withDescription("If true, uses the Default Credential chain, including
EC2 instance profiles or roles, environment variables, default user
credentials, etc.")
+ ->withDefaultValue<bool>(false)
+ ->isRequired(true)
+ ->build());
+
+S3Processor::S3Processor(std::string name, minifi::utils::Identifier uuid,
const std::shared_ptr<logging::Logger> &logger)
+ : core::Processor(std::move(name), uuid)
+ , logger_(logger)
+ , s3_wrapper_(minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
+ setSupportedProperties({ObjectKey, Bucket, AccessKey, SecretKey,
CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region,
CommunicationsTimeout,
+ EndpointOverrideURL, ProxyHost, ProxyPort,
ProxyUsername, ProxyPassword, UseDefaultCredentials});
+}
+
+S3Processor::S3Processor(std::string name, minifi::utils::Identifier uuid,
const std::shared_ptr<logging::Logger> &logger,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+ : core::Processor(std::move(name), uuid)
+ , logger_(logger)
+ , s3_wrapper_(std::move(s3_wrapper)) {
+ setSupportedProperties({ObjectKey, Bucket, AccessKey, SecretKey,
CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region,
CommunicationsTimeout,
+ EndpointOverrideURL, ProxyHost, ProxyPort,
ProxyUsername, ProxyPassword, UseDefaultCredentials});
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
S3Processor::getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const {
+ std::string service_name;
+ if (!context->getProperty(AWSCredentialsProviderService.getName(),
service_name) || service_name.empty()) {
+ return minifi::utils::nullopt;
+ }
+
+ std::shared_ptr<core::controller::ControllerService> service =
context->getControllerService(service_name);
+ if (!service) {
+ return minifi::utils::nullopt;
+ }
+
+ auto aws_credentials_service =
std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
+ if (!aws_credentials_service) {
+ return minifi::utils::nullopt;
+ }
+
+ return
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials>
S3Processor::getAWSCredentials(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) {
+ auto service_cred = getAWSCredentialsFromControllerService(context);
+ if (service_cred) {
+ logger_->log_info("AWS Credentials successfully set from controller
service");
+ return service_cred.value();
+ }
+
+ std::string access_key;
+ context->getProperty(AccessKey, access_key, flow_file);
+ aws_credentials_provider_.setAccessKey(access_key);
+ std::string secret_key;
+ context->getProperty(SecretKey, secret_key, flow_file);
+ aws_credentials_provider_.setSecretKey(secret_key);
+ std::string credential_file;
+ context->getProperty(CredentialsFile.getName(), credential_file);
+ aws_credentials_provider_.setCredentialsFile(credential_file);
+ bool use_default_credentials = false;
+ context->getProperty(UseDefaultCredentials.getName(),
use_default_credentials);
+ aws_credentials_provider_.setUseDefaultCredentials(use_default_credentials);
+
+ return aws_credentials_provider_.getAWSCredentials();
+}
+
+bool S3Processor::setProxy(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
+ aws::s3::ProxyOptions proxy;
+ context->getProperty(ProxyHost, proxy.host, flow_file);
+ std::string port_str;
+ if (context->getProperty(ProxyPort, port_str, flow_file) &&
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
+ logger_->log_error("Proxy port invalid");
+ return false;
+ }
+ context->getProperty(ProxyUsername, proxy.username, flow_file);
+ context->getProperty(ProxyPassword, proxy.password, flow_file);
+ if (!proxy.host.empty()) {
+ s3_wrapper_->setProxy(proxy);
+ logger_->log_info("Proxy for S3Processor was set.");
+ }
+ return true;
+}
+
+void S3Processor::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ if (!context->getProperty(Bucket.getName(), bucket_) || bucket_.empty()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or
invalid");
+ }
+ logger_->log_debug("S3Processor: Bucket [%s]", bucket_);
+
+ std::string value;
+ if (!context->getProperty(Region.getName(), value) || value.empty() ||
REGIONS.count(value) == 0) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or
invalid");
+ }
+ s3_wrapper_->setRegion(value);
+ logger_->log_debug("S3Processor: Region [%s]", value);
+
+ uint64_t timeout_val;
+ if (context->getProperty(CommunicationsTimeout.getName(), value) &&
!value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
+ s3_wrapper_->setTimeout(timeout_val);
+ logger_->log_debug("S3Processor: Communications Timeout [%d]",
timeout_val);
+ } else {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout
missing or invalid");
+ }
+}
+
+bool S3Processor::getExpressionLanguageSupportedProperties(
+ const std::shared_ptr<core::ProcessContext> &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) {
+ context->getProperty(ObjectKey, object_key_, flow_file);
+ if (object_key_.empty() && (!flow_file->getAttribute("filename",
object_key_) || object_key_.empty())) {
+ logger_->log_error("No Object Key is set and default object key 'filename'
attribute could not be found!");
+ return false;
+ }
+ logger_->log_debug("S3Processor: Object Key [%s]", object_key_);
+
+ if (!context->getProperty(Bucket, bucket_, flow_file) || bucket_.empty()) {
+ logger_->log_error("Bucket '%s' is invalid or empty!", bucket_);
+ return false;
+ }
+ logger_->log_debug("S3Processor: Bucket [%s]", bucket_);
+
+ auto credentials = getAWSCredentials(context, flow_file);
+ if (!credentials) {
+ logger_->log_error("AWS Credentials have not been set!");
+ return false;
+ }
+ s3_wrapper_->setCredentials(credentials.value());
+
+ if (!setProxy(context, flow_file)) {
+ return false;
+ }
+
+ std::string value;
+ if (context->getProperty(EndpointOverrideURL, value, flow_file) &&
!value.empty()) {
+ s3_wrapper_->setEndpointOverrideUrl(value);
+ logger_->log_debug("S3Processor: Endpoint Override URL [%d]", value);
+ }
+
+ return true;
+}
+
+} // namespace processors
+} // namespace aws
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/aws/processors/PutS3Object.h
b/extensions/aws/processors/S3Processor.h
similarity index 50%
copy from extensions/aws/processors/PutS3Object.h
copy to extensions/aws/processors/S3Processor.h
index 5c29b11..a5cb422 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/S3Processor.h
@@ -1,6 +1,6 @@
/**
- * @file PutS3Object.h
- * PutS3Object class declaration
+ * @file S3Processor.h
+ * Base S3 processor class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -20,21 +20,20 @@
#pragma once
-#include <sstream>
#include <utility>
-#include <vector>
#include <memory>
#include <string>
#include <set>
-#include <algorithm>
-#include "S3Wrapper.h"
+#include "aws/core/auth/AWSCredentialsProvider.h"
+
+#include "S3WrapperBase.h"
+#include "AWSCredentialsProvider.h"
#include "core/Property.h"
#include "core/Processor.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/OptionalUtils.h"
-#include "AWSCredentialsProvider.h"
namespace org {
namespace apache {
@@ -73,32 +72,19 @@ constexpr const char *US_WEST_2 = "us-west-2";
} // namespace region
-class PutS3Object : public core::Processor {
+class S3Processor : public core::Processor {
public:
- static constexpr char const* ProcessorName = "PutS3Object";
-
- static const std::set<std::string> CANNED_ACLS;
static const std::set<std::string> REGIONS;
- static const std::set<std::string> STORAGE_CLASSES;
- static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
// Supported Properties
static const core::Property ObjectKey;
static const core::Property Bucket;
- static const core::Property ContentType;
static const core::Property AccessKey;
static const core::Property SecretKey;
static const core::Property CredentialsFile;
static const core::Property AWSCredentialsProviderService;
- static const core::Property StorageClass;
- static const core::Property ServerSideEncryption;
static const core::Property Region;
static const core::Property CommunicationsTimeout;
- static const core::Property FullControlUserList;
- static const core::Property ReadPermissionUserList;
- static const core::Property ReadACLUserList;
- static const core::Property WriteACLUserList;
- static const core::Property CannedACL;
static const core::Property EndpointOverrideURL;
static const core::Property ProxyHost;
static const core::Property ProxyPort;
@@ -106,89 +92,26 @@ class PutS3Object : public core::Processor {
static const core::Property ProxyPassword;
static const core::Property UseDefaultCredentials;
- // Supported Relationships
- static const core::Relationship Failure;
- static const core::Relationship Success;
-
- explicit PutS3Object(std::string name, minifi::utils::Identifier uuid =
minifi::utils::Identifier())
- : PutS3Object(name, uuid,
minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
- }
-
- explicit PutS3Object(std::string name, minifi::utils::Identifier uuid,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
- : core::Processor(std::move(name), uuid)
- , s3_wrapper_(std::move(s3_wrapper)) {
- }
-
- ~PutS3Object() override = default;
+ explicit S3Processor(std::string name, minifi::utils::Identifier uuid, const
std::shared_ptr<logging::Logger> &logger);
bool supportsDynamicProperties() override { return true; }
- void initialize() override;
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
- void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
-
- class ReadCallback : public InputStreamCallback {
- public:
- static const uint64_t MAX_SIZE;
- static const uint64_t BUFFER_SIZE;
-
- ReadCallback(uint64_t flow_size, const
minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3WrapperBase*
s3_wrapper)
- : flow_size_(flow_size)
- , options_(options)
- , s3_wrapper_(s3_wrapper) {
- }
-
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
- if (flow_size_ > MAX_SIZE) {
- return -1;
- }
- std::vector<uint8_t> buffer;
- auto data_stream = std::make_shared<std::stringstream>();
- buffer.reserve(BUFFER_SIZE);
- read_size_ = 0;
- while (read_size_ < flow_size_) {
- auto next_read_size = std::min(flow_size_ - read_size_, BUFFER_SIZE);
- int read_ret = stream->read(buffer.data(), next_read_size);
- if (read_ret < 0) {
- return -1;
- }
- if (read_ret > 0) {
- data_stream->write(reinterpret_cast<char*>(buffer.data()),
next_read_size);
- read_size_ += read_ret;
- } else {
- break;
- }
- }
- result_ = s3_wrapper_->putObject(options_, data_stream);
- return read_size_;
- }
-
- uint64_t flow_size_;
- const minifi::aws::s3::PutObjectRequestParameters& options_;
- aws::s3::S3WrapperBase* s3_wrapper_;
- uint64_t read_size_ = 0;
- minifi::utils::optional<minifi::aws::s3::PutObjectResult> result_ =
minifi::utils::nullopt;
- };
-
- private:
+
+ protected:
+ explicit S3Processor(std::string name, minifi::utils::Identifier uuid, const
std::shared_ptr<logging::Logger> &logger,
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper);
+
minifi::utils::optional<Aws::Auth::AWSCredentials>
getAWSCredentialsFromControllerService(const
std::shared_ptr<core::ProcessContext> &context) const;
minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
- void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
bool setProxy(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
- bool getExpressionLanguageSupportedProperties(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
- std::string parseAccessControlList(const std::string &comma_separated_list)
const;
- bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file);
- bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file);
- void setAttributes(const std::shared_ptr<core::ProcessSession> &session,
const std::shared_ptr<core::FlowFile> &flow_file, const
minifi::aws::s3::PutObjectResult &put_object_result);
-
- std::shared_ptr<logging::Logger>
logger_{logging::LoggerFactory<PutS3Object>::getLogger()};
- aws::s3::PutObjectRequestParameters put_s3_request_params_;
+ virtual bool getExpressionLanguageSupportedProperties(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::FlowFile> &flow_file);
+
+ std::shared_ptr<logging::Logger> logger_;
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper_;
- std::string user_metadata_;
+ std::string bucket_;
+ std::string object_key_;
aws::AWSCredentialsProvider aws_credentials_provider_;
};
-REGISTER_RESOURCE(PutS3Object, "This Processor puts FlowFiles to an Amazon S3
Bucket.");
-
} // namespace processors
} // namespace aws
} // namespace minifi
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index 825f34a..8056e28 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -35,11 +35,27 @@ minifi::utils::optional<Aws::S3::Model::PutObjectResult>
S3Wrapper::sendPutObjec
Aws::S3::Model::PutObjectOutcome outcome = s3_client.PutObject(request);
if (outcome.IsSuccess()) {
- logger_->log_info("Added S3 object %s to bucket %s", request.GetKey(),
request.GetBucket());
+ logger_->log_info("Added S3 object '%s' to bucket '%s'",
request.GetKey(), request.GetBucket());
return outcome.GetResultWithOwnership();
} else {
- logger_->log_error("PutS3Object failed with the following: '%s'",
outcome.GetError().GetMessage());
- return minifi::utils::nullopt;
+ logger_->log_error("PutS3Object failed with the following: '%s'",
outcome.GetError().GetMessage());
+ return minifi::utils::nullopt;
+ }
+}
+
+bool S3Wrapper::sendDeleteObjectRequest(const
Aws::S3::Model::DeleteObjectRequest& request) {
+ Aws::S3::S3Client s3_client(credentials_, client_config_);
+ Aws::S3::Model::DeleteObjectOutcome outcome =
s3_client.DeleteObject(request);
+
+ if (outcome.IsSuccess()) {
+ logger_->log_info("Deleted S3 object '%s' from bucket '%s'",
request.GetKey(), request.GetBucket());
+ return true;
+ } else if (outcome.GetError().GetErrorType() ==
Aws::S3::S3Errors::NO_SUCH_KEY) {
+ logger_->log_info("S3 object '%s' was not found in bucket '%s'",
request.GetKey(), request.GetBucket());
+ return true;
+ } else {
+ logger_->log_error("DeleteS3Object failed with the following: '%s'",
outcome.GetError().GetMessage());
+ return false;
}
}
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index 3302122..323744e 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -34,6 +34,7 @@ namespace s3 {
class S3Wrapper : public S3WrapperBase {
protected:
minifi::utils::optional<Aws::S3::Model::PutObjectResult>
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override;
+ bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest&
request) override;
};
} // namespace s3
diff --git a/extensions/aws/s3/S3WrapperBase.cpp
b/extensions/aws/s3/S3WrapperBase.cpp
index 9cf2fad..3468834 100644
--- a/extensions/aws/s3/S3WrapperBase.cpp
+++ b/extensions/aws/s3/S3WrapperBase.cpp
@@ -124,6 +124,16 @@ minifi::utils::optional<PutObjectResult>
S3WrapperBase::putObject(const PutObjec
}
}
+bool S3WrapperBase::deleteObject(const std::string& bucket, const std::string&
object_key, const std::string& version) {
+ Aws::S3::Model::DeleteObjectRequest request;
+ request.SetBucket(bucket);
+ request.SetKey(object_key);
+ if (!version.empty()) {
+ request.SetVersionId(version);
+ }
+ return sendDeleteObjectRequest(request);
+}
+
} // namespace s3
} // namespace aws
} // namespace minifi
diff --git a/extensions/aws/s3/S3WrapperBase.h
b/extensions/aws/s3/S3WrapperBase.h
index 8204299..2416792 100644
--- a/extensions/aws/s3/S3WrapperBase.h
+++ b/extensions/aws/s3/S3WrapperBase.h
@@ -28,6 +28,8 @@
#include "aws/core/auth/AWSCredentialsProvider.h"
#include "aws/s3/S3Client.h"
#include "aws/s3/model/PutObjectRequest.h"
+#include "aws/s3/model/PutObjectResult.h"
+#include "aws/s3/model/DeleteObjectRequest.h"
#include "aws/s3/model/StorageClass.h"
#include "aws/s3/model/ServerSideEncryption.h"
#include "aws/s3/model/ObjectCannedACL.h"
@@ -107,11 +109,13 @@ class S3WrapperBase {
void setProxy(const ProxyOptions& proxy);
minifi::utils::optional<PutObjectResult> putObject(const
PutObjectRequestParameters& options, std::shared_ptr<Aws::IOStream>
data_stream);
+ bool deleteObject(const std::string& bucket, const std::string& object_key,
const std::string& version = "");
virtual ~S3WrapperBase() = default;
protected:
virtual minifi::utils::optional<Aws::S3::Model::PutObjectResult>
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) = 0;
+ virtual bool sendDeleteObjectRequest(const
Aws::S3::Model::DeleteObjectRequest& request) = 0;
void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const
std::string& canned_acl) const;
static std::string getExpiryDate(const std::string& expiration);
static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption
encryption);
diff --git a/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
b/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
new file mode 100644
index 0000000..6255e2e
--- /dev/null
+++ b/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "S3TestsFixture.h"
+#include "processors/DeleteS3Object.h"
+
+using DeleteS3ObjectTestsFixture =
S3TestsFixture<minifi::aws::processors::DeleteS3Object>;
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test AWS credential setting",
"[awsCredentials]") {
+ setBucket();
+
+ SECTION("Test property credentials") {
+ setAccesKeyCredentialsInProcessor();
+ }
+
+ SECTION("Test credentials setting from AWS Credentials service") {
+ setAccessKeyCredentialsInController();
+ setCredentialsService();
+ }
+
+ SECTION("Test credentials file setting") {
+ setCredentialFile(s3_processor);
+ }
+
+ SECTION("Test credentials file setting from AWS Credentials service") {
+ setCredentialFile(aws_credentials_service);
+ setCredentialsService();
+ }
+
+ SECTION("Test credentials setting using default credential chain") {
+ setUseDefaultCredentialsChain(s3_processor);
+ }
+
+ SECTION("Test credentials setting from AWS Credentials service using default
credential chain") {
+ setUseDefaultCredentialsChain(aws_credentials_service);
+ setCredentialsService();
+ }
+
+ test_controller.runSession(plan, true);
+ REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+ REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSSecretKey() == "secret");
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test required property not set",
"[awsS3Config]") {
+ SECTION("Test credentials not set") {
+ }
+
+ SECTION("Test no bucket is set") {
+ setAccesKeyCredentialsInProcessor();
+ }
+
+ SECTION("Test no object key is set") {
+ setRequiredProperties();
+ plan->setProperty(update_attribute, "filename", "", true);
+ }
+
+ SECTION("Test region is empty") {
+ setRequiredProperties();
+ plan->setProperty(s3_processor, "Region", "");
+ }
+
+ REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test proxy setting",
"[awsS3Proxy]") {
+ setRequiredProperties();
+ setProxy();
+ test_controller.runSession(plan, true);
+ checkProxySettings();
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test success case with default
values", "[awsS3DeleteSuccess]") {
+ setRequiredProperties();
+ test_controller.runSession(plan, true);
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetBucket() ==
"testBucket");
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetKey() ==
INPUT_FILENAME);
+ REQUIRE(!mock_s3_wrapper_ptr->delete_object_request.VersionIdHasBeenSet());
+ REQUIRE(LogTestController::getInstance().contains("Successfully deleted S3
object"));
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test version setting",
"[awsS3DeleteWithVersion]") {
+ setRequiredProperties();
+ plan->setProperty(update_attribute, "s3.version", "v1", true);
+ plan->setProperty(s3_processor, "Version", "${s3.version}");
+ test_controller.runSession(plan, true);
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetVersionId() == "v1");
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.VersionIdHasBeenSet());
+ REQUIRE(LogTestController::getInstance().contains("Successfully deleted S3
object"));
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test optional client
configuration values", "[awsS3DeleteOptionalClientConfig]") {
+ setRequiredProperties();
+ plan->setProperty(s3_processor, "Region",
minifi::aws::processors::region::US_EAST_1);
+ plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
+ plan->setProperty(update_attribute, "test.endpoint",
"http://localhost:1234", true);
+ plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
+ test_controller.runSession(plan, true);
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region ==
minifi::aws::processors::region::US_EAST_1);
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride ==
"http://localhost:1234");
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test failure case",
"[awsS3DeleteFailure]") {
+ auto log_failure = plan->addProcessor(
+ "LogAttribute",
+ "LogFailure",
+ core::Relationship("failure", "d"));
+ plan->addConnection(s3_processor, core::Relationship("failure", "d"),
log_failure);
+ setRequiredProperties();
+ plan->setProperty(s3_processor, "Version", "v1");
+ log_failure->setAutoTerminatedRelationships({{core::Relationship("success",
"d")}});
+ mock_s3_wrapper_ptr->setDeleteObjectResult(false);
+ test_controller.runSession(plan, true);
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetBucket() ==
"testBucket");
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetKey() ==
INPUT_FILENAME);
+ REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetVersionId() == "v1");
+ REQUIRE(LogTestController::getInstance().contains("Failed to delete S3
object"));
+}
diff --git a/libminifi/test/aws-tests/MockS3Wrapper.h
b/libminifi/test/aws-tests/MockS3Wrapper.h
new file mode 100644
index 0000000..6958938
--- /dev/null
+++ b/libminifi/test/aws-tests/MockS3Wrapper.h
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "s3/S3WrapperBase.h"
+
+const std::string S3_VERSION = "1.2.3";
+const std::string S3_ETAG = "\"tag-123\"";
+const std::string S3_ETAG_UNQUOTED = "tag-123";
+const std::string S3_EXPIRATION = "expiry-date=\"Wed, 28 Oct 2020 00:00:00
GMT\", rule-id=\"my_expiration_rule\"";
+const std::string S3_EXPIRATION_DATE = "Wed, 28 Oct 2020 00:00:00 GMT";
+const Aws::S3::Model::ServerSideEncryption S3_SSEALGORITHM =
Aws::S3::Model::ServerSideEncryption::aws_kms;
+const std::string S3_SSEALGORITHM_STR = "aws_kms";
+
+class MockS3Wrapper : public minifi::aws::s3::S3WrapperBase {
+ public:
+ minifi::utils::optional<Aws::S3::Model::PutObjectResult>
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
+ put_object_request = request;
+
+ Aws::S3::Model::PutObjectResult put_s3_result;
+ if (!return_empty_result_) {
+ put_s3_result.SetVersionId(S3_VERSION);
+ put_s3_result.SetETag(S3_ETAG);
+ put_s3_result.SetExpiration(S3_EXPIRATION);
+ put_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
+ }
+ return put_s3_result;
+ }
+
+ bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest&
request) override {
+ delete_object_request = request;
+ return delete_object_result_;
+ }
+
+ Aws::Auth::AWSCredentials getCredentials() const {
+ return credentials_;
+ }
+
+ Aws::Client::ClientConfiguration getClientConfig() const {
+ return client_config_;
+ }
+
+ std::string getPutObjectRequestBody() const {
+ std::istreambuf_iterator<char> buf_it;
+ return
std::string(std::istreambuf_iterator<char>(*put_object_request.GetBody()),
buf_it);
+ }
+
+ void returnEmptyS3Result(bool return_empty_result = true) {
+ return_empty_result_ = return_empty_result;
+ }
+
+ void setDeleteObjectResult(bool delete_object_result) {
+ delete_object_result_ = delete_object_result;
+ }
+
+ Aws::S3::Model::PutObjectRequest put_object_request;
+ Aws::S3::Model::DeleteObjectRequest delete_object_request;
+
+ private:
+ bool delete_object_result_ = true;
+ bool return_empty_result_ = false;
+};
diff --git a/libminifi/test/aws-tests/PutS3ObjectTests.cpp
b/libminifi/test/aws-tests/PutS3ObjectTests.cpp
index fbf47e8..1bc3030 100644
--- a/libminifi/test/aws-tests/PutS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/PutS3ObjectTests.cpp
@@ -16,142 +16,11 @@
* limitations under the License.
*/
-#include <stdlib.h>
-#include <iostream>
-#include <map>
-
-#include "core/Processor.h"
-#include "../TestBase.h"
+#include "S3TestsFixture.h"
#include "processors/PutS3Object.h"
-#include "processors/GetFile.h"
-#include "processors/LogAttribute.h"
-#include "processors/UpdateAttribute.h"
-#include "s3/S3WrapperBase.h"
-#include "utils/file/FileUtils.h"
-
-const std::string S3_VERSION = "1.2.3";
-const std::string S3_ETAG = "\"tag-123\"";
-const std::string S3_ETAG_UNQUOTED = "tag-123";
-const std::string S3_EXPIRATION = "expiry-date=\"Wed, 28 Oct 2020 00:00:00
GMT\", rule-id=\"my_expiration_rule\"";
-const std::string S3_EXPIRATION_DATE = "Wed, 28 Oct 2020 00:00:00 GMT";
-const Aws::S3::Model::ServerSideEncryption S3_SSEALGORITHM =
Aws::S3::Model::ServerSideEncryption::aws_kms;
-const std::string S3_SSEALGORITHM_STR = "aws_kms";
-class MockS3Wrapper : public minifi::aws::s3::S3WrapperBase {
+class PutS3ObjectTestsFixture : public
S3TestsFixture<minifi::aws::processors::PutS3Object> {
public:
- Aws::Auth::AWSCredentials getCredentials() const {
- return credentials_;
- }
-
- Aws::Client::ClientConfiguration getClientConfig() const {
- return client_config_;
- }
-
- minifi::utils::optional<Aws::S3::Model::PutObjectResult>
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
- std::istreambuf_iterator<char> buf_it;
- put_s3_data =
std::string(std::istreambuf_iterator<char>(*request.GetBody()), buf_it);
- bucket_name = request.GetBucket();
- object_key = request.GetKey();
- storage_class = request.GetStorageClass();
- server_side_encryption = request.GetServerSideEncryption();
- metadata_map = request.GetMetadata();
- content_type = request.GetContentType();
- fullcontrol_user_list = request.GetGrantFullControl();
- read_user_list = request.GetGrantRead();
- read_acl_user_list = request.GetGrantReadACP();
- write_acl_user_list = request.GetGrantWriteACP();
- write_acl_user_list = request.GetGrantWriteACP();
- canned_acl = request.GetACL();
-
- if (!get_empty_result) {
- put_s3_result.SetVersionId(S3_VERSION);
- put_s3_result.SetETag(S3_ETAG);
- put_s3_result.SetExpiration(S3_EXPIRATION);
- put_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
- }
- return put_s3_result;
- }
-
- std::string bucket_name;
- std::string object_key;
- Aws::S3::Model::StorageClass storage_class;
- Aws::S3::Model::ServerSideEncryption server_side_encryption;
- Aws::S3::Model::PutObjectResult put_s3_result;
- std::string put_s3_data;
- std::map<std::string, std::string> metadata_map;
- std::string content_type;
- std::string fullcontrol_user_list;
- std::string read_user_list;
- std::string read_acl_user_list;
- std::string write_acl_user_list;
- Aws::S3::Model::ObjectCannedACL canned_acl;
- bool get_empty_result = false;
-};
-
-class PutS3ObjectTestsFixture {
- public:
- PutS3ObjectTestsFixture() {
- // Disable retrieving AWS metadata for tests
- #ifdef WIN32
- _putenv_s("AWS_EC2_METADATA_DISABLED", "true");
- #else
- setenv("AWS_EC2_METADATA_DISABLED", "true", 1);
- #endif
-
- LogTestController::getInstance().setDebug<TestPlan>();
- LogTestController::getInstance().setDebug<minifi::core::Processor>();
- LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
- LogTestController::getInstance().setTrace<processors::GetFile>();
- LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
- LogTestController::getInstance().setDebug<processors::LogAttribute>();
-
LogTestController::getInstance().setTrace<minifi::aws::processors::PutS3Object>();
-
- // Build MiNiFi processing graph
- plan = test_controller.createPlan();
- mock_s3_wrapper_ptr = new MockS3Wrapper();
- std::unique_ptr<minifi::aws::s3::S3WrapperBase>
mock_s3_wrapper(mock_s3_wrapper_ptr);
- put_s3_object =
std::make_shared<minifi::aws::processors::PutS3Object>("PutS3Object",
utils::Identifier(), std::move(mock_s3_wrapper));
-
- char input_dir_mask[] = "/tmp/gt.XXXXXX";
- auto input_dir = test_controller.createTempDirectory(input_dir_mask);
- std::ofstream input_file_stream(input_dir +
utils::file::FileUtils::get_separator() + "input_data.log");
- input_file_stream << "input_data";
- input_file_stream.close();
- get_file = plan->addProcessor("GetFile", "GetFile");
- plan->setProperty(get_file, processors::GetFile::Directory.getName(),
input_dir);
- plan->setProperty(get_file, processors::GetFile::KeepSourceFile.getName(),
"false");
- update_attribute = plan->addProcessor(
- "UpdateAttribute",
- "UpdateAttribute",
- core::Relationship("success", "d"),
- true);
- plan->addProcessor(
- put_s3_object,
- "PutS3Object",
- core::Relationship("success", "d"),
- true);
- plan->addProcessor(
- "LogAttribute",
- "LogAttribute",
- core::Relationship("success", "d"),
- true);
- }
-
- void setBasicCredentials() {
- plan->setProperty(put_s3_object, "Access Key", "key");
- plan->setProperty(put_s3_object, "Secret Key", "secret");
- }
-
- void setBucket() {
- plan->setProperty(update_attribute, "test.bucket", "testBucket", true);
- plan->setProperty(put_s3_object, "Bucket", "${test.bucket}");
- }
-
- void setRequiredProperties() {
- setBasicCredentials();
- setBucket();
- }
-
void checkPutObjectResults() {
REQUIRE(LogTestController::getInstance().contains("key:s3.version value:"
+ S3_VERSION));
REQUIRE(LogTestController::getInstance().contains("key:s3.etag value:" +
S3_ETAG_UNQUOTED));
@@ -165,94 +34,36 @@ class PutS3ObjectTestsFixture {
REQUIRE(!LogTestController::getInstance().contains("key:s3.expiration
value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(!LogTestController::getInstance().contains("key:s3.sseAlgorithm
value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
}
-
- std::string createTempFile(const std::string& filename) {
- char temp_dir[] = "/tmp/gt.XXXXXX";
- auto temp_path = test_controller.createTempDirectory(temp_dir);
- REQUIRE(!temp_path.empty());
- std::string temp_file(temp_path + utils::file::FileUtils::get_separator()
+ filename);
- return temp_file;
- }
-
- virtual ~PutS3ObjectTestsFixture() {
- LogTestController::getInstance().reset();
- }
-
- protected:
- TestController test_controller;
- std::shared_ptr<TestPlan> plan;
- MockS3Wrapper* mock_s3_wrapper_ptr;
- std::shared_ptr<core::Processor> put_s3_object;
- std::shared_ptr<core::Processor> get_file;
- std::shared_ptr<core::Processor> update_attribute;
};
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting",
"[awsCredentials]") {
setBucket();
SECTION("Test property credentials") {
- plan->setProperty(update_attribute, "s3.accessKey", "key", true);
- plan->setProperty(put_s3_object, "Access Key", "${s3.accessKey}");
- plan->setProperty(update_attribute, "s3.secretKey", "secret", true);
- plan->setProperty(put_s3_object, "Secret Key", "${s3.secretKey}");
+ setAccesKeyCredentialsInProcessor();
}
-
SECTION("Test credentials setting from AWS Credentials service") {
- auto aws_cred_service = plan->addController("AWSCredentialsService",
"AWSCredentialsService");
- plan->setProperty(aws_cred_service, "Access Key", "key");
- plan->setProperty(aws_cred_service, "Secret Key", "secret");
- plan->setProperty(put_s3_object, "AWS Credentials Provider service",
"AWSCredentialsService");
+ setAccessKeyCredentialsInController();
+ setCredentialsService();
}
SECTION("Test credentials file setting") {
- char in_dir[] = "/tmp/gt.XXXXXX";
- auto temp_path = test_controller.createTempDirectory(in_dir);
- REQUIRE(!temp_path.empty());
- std::string aws_credentials_file(temp_path +
utils::file::FileUtils::get_separator() + "aws_creds.conf");
- std::ofstream aws_credentials_file_stream(aws_credentials_file);
- aws_credentials_file_stream << "accessKey=key" << std::endl;
- aws_credentials_file_stream << "secretKey=secret" << std::endl;
- aws_credentials_file_stream.close();
- plan->setProperty(put_s3_object, "Credentials File", aws_credentials_file);
+ setCredentialFile(s3_processor);
}
SECTION("Test credentials file setting from AWS Credentials service") {
- char in_dir[] = "/tmp/gt.XXXXXX";
- auto temp_path = test_controller.createTempDirectory(in_dir);
- REQUIRE(!temp_path.empty());
- std::string aws_credentials_file(temp_path +
utils::file::FileUtils::get_separator() + "aws_creds.conf");
- std::ofstream aws_credentials_file_stream(aws_credentials_file);
- aws_credentials_file_stream << "accessKey=key" << std::endl;
- aws_credentials_file_stream << "secretKey=secret" << std::endl;
- aws_credentials_file_stream.close();
- auto aws_cred_service = plan->addController("AWSCredentialsService",
"AWSCredentialsService");
- plan->setProperty(aws_cred_service, "Credentials File",
aws_credentials_file);
- plan->setProperty(put_s3_object, "AWS Credentials Provider service",
"AWSCredentialsService");
+ setCredentialFile(aws_credentials_service);
+ setCredentialsService();
}
SECTION("Test credentials setting using default credential chain") {
- #ifdef WIN32
- _putenv_s("AWS_ACCESS_KEY_ID", "key");
- _putenv_s("AWS_SECRET_ACCESS_KEY", "secret");
- #else
- setenv("AWS_ACCESS_KEY_ID", "key", 1);
- setenv("AWS_SECRET_ACCESS_KEY", "secret", 1);
- #endif
- plan->setProperty(put_s3_object, "Use Default Credentials", "true");
+ setUseDefaultCredentialsChain(s3_processor);
}
SECTION("Test credentials setting from AWS Credentials service using default
credential chain") {
- auto aws_cred_service = plan->addController("AWSCredentialsService",
"AWSCredentialsService");
- plan->setProperty(aws_cred_service, "Use Default Credentials", "true");
- #ifdef WIN32
- _putenv_s("AWS_ACCESS_KEY_ID", "key");
- _putenv_s("AWS_SECRET_ACCESS_KEY", "secret");
- #else
- setenv("AWS_ACCESS_KEY_ID", "key", 1);
- setenv("AWS_SECRET_ACCESS_KEY", "secret", 1);
- #endif
- plan->setProperty(put_s3_object, "AWS Credentials Provider service",
"AWSCredentialsService");
+ setUseDefaultCredentialsChain(aws_credentials_service);
+ setCredentialsService();
}
test_controller.runSession(plan, true);
@@ -265,7 +76,7 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test required
property not set", "[aw
}
SECTION("Test no bucket is set") {
- setBasicCredentials();
+ setAccesKeyCredentialsInProcessor();
}
SECTION("Test no object key is set") {
@@ -275,17 +86,17 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test required
property not set", "[aw
SECTION("Test storage class is empty") {
setRequiredProperties();
- plan->setProperty(put_s3_object, "Storage Class", "");
+ plan->setProperty(s3_processor, "Storage Class", "");
}
SECTION("Test region is empty") {
setRequiredProperties();
- plan->setProperty(put_s3_object, "Region", "");
+ plan->setProperty(s3_processor, "Region", "");
}
SECTION("Test no server side encryption is set") {
setRequiredProperties();
- plan->setProperty(put_s3_object, "Server Side Encryption", "");
+ plan->setProperty(s3_processor, "Server Side Encryption", "");
}
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
@@ -295,25 +106,25 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default
client configuration",
setRequiredProperties();
test_controller.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:s3.bucket
value:testBucket"));
- REQUIRE(LogTestController::getInstance().contains("key:s3.key
value:input_data.log"));
+ REQUIRE(LogTestController::getInstance().contains("key:s3.key value:" +
INPUT_FILENAME));
REQUIRE(LogTestController::getInstance().contains("key:s3.contenttype
value:application/octet-stream"));
checkPutObjectResults();
- REQUIRE(mock_s3_wrapper_ptr->content_type == "application/octet-stream");
- REQUIRE(mock_s3_wrapper_ptr->storage_class ==
Aws::S3::Model::StorageClass::STANDARD);
- REQUIRE(mock_s3_wrapper_ptr->server_side_encryption ==
Aws::S3::Model::ServerSideEncryption::NOT_SET);
- REQUIRE(mock_s3_wrapper_ptr->canned_acl ==
Aws::S3::Model::ObjectCannedACL::NOT_SET);
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetContentType() ==
"application/octet-stream");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetStorageClass() ==
Aws::S3::Model::StorageClass::STANDARD);
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetServerSideEncryption() ==
Aws::S3::Model::ServerSideEncryption::NOT_SET);
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetACL() ==
Aws::S3::Model::ObjectCannedACL::NOT_SET);
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region ==
minifi::aws::processors::region::US_WEST_2);
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 30000);
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride.empty());
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost.empty());
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName.empty());
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword.empty());
- REQUIRE(mock_s3_wrapper_ptr->put_s3_data == "input_data");
+ REQUIRE(mock_s3_wrapper_ptr->getPutObjectRequestBody() == INPUT_DATA);
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default client configuration
with empty result", "[awsS3ClientConfig]") {
setRequiredProperties();
- mock_s3_wrapper_ptr->get_empty_result = true;
+ mock_s3_wrapper_ptr->returnEmptyS3Result();
test_controller.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:s3.bucket
value:testBucket"));
REQUIRE(LogTestController::getInstance().contains("key:s3.key
value:input_data.log"));
@@ -323,80 +134,70 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default
client configuration wi
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Set non-default client
configuration", "[awsS3ClientConfig]") {
setRequiredProperties();
- plan->setProperty(put_s3_object, "Object Key", "custom_key");
+ plan->setProperty(s3_processor, "Object Key", "custom_key");
plan->setProperty(update_attribute, "test.contentType", "application/tar",
true);
- plan->setProperty(put_s3_object, "Content Type", "${test.contentType}");
- plan->setProperty(put_s3_object, "Storage Class", "ReducedRedundancy");
- plan->setProperty(put_s3_object, "Region",
minifi::aws::processors::region::US_EAST_1);
- plan->setProperty(put_s3_object, "Communications Timeout", "10 Sec");
+ plan->setProperty(s3_processor, "Content Type", "${test.contentType}");
+ plan->setProperty(s3_processor, "Storage Class", "ReducedRedundancy");
+ plan->setProperty(s3_processor, "Region",
minifi::aws::processors::region::US_EAST_1);
+ plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setProperty(update_attribute, "test.endpoint",
"http://localhost:1234", true);
- plan->setProperty(put_s3_object, "Endpoint Override URL",
"${test.endpoint}");
- plan->setProperty(put_s3_object, "Server Side Encryption", "AES256");
+ plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
+ plan->setProperty(s3_processor, "Server Side Encryption", "AES256");
test_controller.runSession(plan, true);
checkPutObjectResults();
REQUIRE(LogTestController::getInstance().contains("key:s3.bucket
value:testBucket"));
REQUIRE(LogTestController::getInstance().contains("key:s3.key
value:custom_key"));
REQUIRE(LogTestController::getInstance().contains("key:s3.contenttype
value:application/tar"));
- REQUIRE(mock_s3_wrapper_ptr->content_type == "application/tar");
- REQUIRE(mock_s3_wrapper_ptr->storage_class ==
Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
- REQUIRE(mock_s3_wrapper_ptr->server_side_encryption ==
Aws::S3::Model::ServerSideEncryption::AES256);
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetContentType() ==
"application/tar");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetStorageClass() ==
Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetServerSideEncryption() ==
Aws::S3::Model::ServerSideEncryption::AES256);
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region ==
minifi::aws::processors::region::US_EAST_1);
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride ==
"http://localhost:1234");
- REQUIRE(mock_s3_wrapper_ptr->put_s3_data == "input_data");
+ REQUIRE(mock_s3_wrapper_ptr->getPutObjectRequestBody() == INPUT_DATA);
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test single user metadata",
"[awsS3MetaData]") {
setRequiredProperties();
- plan->setProperty(put_s3_object, "meta_key", "meta_value", true);
+ plan->setProperty(s3_processor, "meta_key", "meta_value", true);
test_controller.runSession(plan, true);
- REQUIRE(mock_s3_wrapper_ptr->metadata_map.at("meta_key") == "meta_value");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key")
== "meta_value");
REQUIRE(LogTestController::getInstance().contains("key:s3.usermetadata
value:meta_key=meta_value"));
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test multiple user metadata",
"[awsS3MetaData]") {
setRequiredProperties();
- plan->setProperty(put_s3_object, "meta_key1", "meta_value1", true);
- plan->setProperty(put_s3_object, "meta_key2", "meta_value2", true);
+ plan->setProperty(s3_processor, "meta_key1", "meta_value1", true);
+ plan->setProperty(s3_processor, "meta_key2", "meta_value2", true);
test_controller.runSession(plan, true);
- REQUIRE(mock_s3_wrapper_ptr->metadata_map.at("meta_key1") == "meta_value1");
- REQUIRE(mock_s3_wrapper_ptr->metadata_map.at("meta_key2") == "meta_value2");
+
REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key1")
== "meta_value1");
+
REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key2")
== "meta_value2");
REQUIRE(LogTestController::getInstance().contains("key:s3.usermetadata
value:meta_key1=meta_value1,meta_key2=meta_value2"));
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test proxy setting",
"[awsS3Proxy]") {
setRequiredProperties();
- plan->setProperty(update_attribute, "test.proxyHost", "host", true);
- plan->setProperty(put_s3_object, "Proxy Host", "${test.proxyHost}");
- plan->setProperty(update_attribute, "test.proxyPort", "1234", true);
- plan->setProperty(put_s3_object, "Proxy Port", "${test.proxyPort}");
- plan->setProperty(update_attribute, "test.proxyUsername", "username", true);
- plan->setProperty(put_s3_object, "Proxy Username", "${test.proxyUsername}");
- plan->setProperty(update_attribute, "test.proxyPassword", "password", true);
- plan->setProperty(put_s3_object, "Proxy Password", "${test.proxyPassword}");
+ setProxy();
test_controller.runSession(plan, true);
- REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost == "host");
- REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPort == 1234);
- REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName == "username");
- REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword == "password");
+ checkProxySettings();
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test access control setting",
"[awsS3ACL]") {
setRequiredProperties();
plan->setProperty(update_attribute, "s3.permissions.full.users",
"myuserid123, [email protected]", true);
- plan->setProperty(put_s3_object, "FullControl User List",
"${s3.permissions.full.users}");
+ plan->setProperty(s3_processor, "FullControl User List",
"${s3.permissions.full.users}");
plan->setProperty(update_attribute, "s3.permissions.read.users",
"myuserid456,[email protected]", true);
- plan->setProperty(put_s3_object, "Read Permission User List",
"${s3.permissions.read.users}");
+ plan->setProperty(s3_processor, "Read Permission User List",
"${s3.permissions.read.users}");
plan->setProperty(update_attribute, "s3.permissions.readacl.users",
"myuserid789, otheruser", true);
- plan->setProperty(put_s3_object, "Read ACL User List",
"${s3.permissions.readacl.users}");
+ plan->setProperty(s3_processor, "Read ACL User List",
"${s3.permissions.readacl.users}");
plan->setProperty(update_attribute, "s3.permissions.writeacl.users",
"[email protected]", true);
- plan->setProperty(put_s3_object, "Write ACL User List",
"${s3.permissions.writeacl.users}");
+ plan->setProperty(s3_processor, "Write ACL User List",
"${s3.permissions.writeacl.users}");
plan->setProperty(update_attribute, "s3.permissions.cannedacl",
"PublicReadWrite", true);
- plan->setProperty(put_s3_object, "Canned ACL",
"${s3.permissions.cannedacl}");
+ plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}");
test_controller.runSession(plan, true);
- REQUIRE(mock_s3_wrapper_ptr->fullcontrol_user_list == "id=myuserid123,
emailAddress=\"[email protected]\"");
- REQUIRE(mock_s3_wrapper_ptr->read_user_list == "id=myuserid456,
emailAddress=\"[email protected]\"");
- REQUIRE(mock_s3_wrapper_ptr->read_acl_user_list == "id=myuserid789,
id=otheruser");
- REQUIRE(mock_s3_wrapper_ptr->write_acl_user_list ==
"emailAddress=\"[email protected]\"");
- REQUIRE(mock_s3_wrapper_ptr->canned_acl ==
Aws::S3::Model::ObjectCannedACL::public_read_write);
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantFullControl() ==
"id=myuserid123, emailAddress=\"[email protected]\"");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantRead() ==
"id=myuserid456, emailAddress=\"[email protected]\"");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantReadACP() ==
"id=myuserid789, id=otheruser");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantWriteACP() ==
"emailAddress=\"[email protected]\"");
+ REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetACL() ==
Aws::S3::Model::ObjectCannedACL::public_read_write);
}
diff --git a/libminifi/test/aws-tests/S3TestsFixture.h
b/libminifi/test/aws-tests/S3TestsFixture.h
new file mode 100644
index 0000000..7e2e800
--- /dev/null
+++ b/libminifi/test/aws-tests/S3TestsFixture.h
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdlib.h>
+#include <iostream>
+
+#include "core/Processor.h"
+#include "../TestBase.h"
+#include "processors/GetFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "utils/file/FileUtils.h"
+#include "MockS3Wrapper.h"
+
+template<typename T>
+class S3TestsFixture {
+ public:
+ const std::string INPUT_FILENAME = "input_data.log";
+ const std::string INPUT_DATA = "input_data";
+
+ S3TestsFixture() {
+ // Disable retrieving AWS metadata for tests
+ #ifdef WIN32
+ _putenv_s("AWS_EC2_METADATA_DISABLED", "true");
+ #else
+ setenv("AWS_EC2_METADATA_DISABLED", "true", 1);
+ #endif
+
+ LogTestController::getInstance().setDebug<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setTrace<processors::GetFile>();
+ LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
+ LogTestController::getInstance().setDebug<processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<T>();
+
+ // Build MiNiFi processing graph
+ plan = test_controller.createPlan();
+ mock_s3_wrapper_ptr = new MockS3Wrapper();
+ std::unique_ptr<minifi::aws::s3::S3WrapperBase>
mock_s3_wrapper(mock_s3_wrapper_ptr);
+ s3_processor = std::shared_ptr<T>(new T("S3Processor",
utils::Identifier(), std::move(mock_s3_wrapper)));
+
+ char input_dir_mask[] = "/tmp/gt.XXXXXX";
+ auto input_dir = test_controller.createTempDirectory(input_dir_mask);
+ std::ofstream input_file_stream(input_dir +
utils::file::FileUtils::get_separator() + INPUT_FILENAME);
+ input_file_stream << INPUT_DATA;
+ input_file_stream.close();
+ get_file = plan->addProcessor("GetFile", "GetFile");
+ plan->setProperty(get_file, processors::GetFile::Directory.getName(),
input_dir);
+ plan->setProperty(get_file, processors::GetFile::KeepSourceFile.getName(),
"false");
+ update_attribute = plan->addProcessor(
+ "UpdateAttribute",
+ "UpdateAttribute",
+ core::Relationship("success", "d"),
+ true);
+ plan->addProcessor(
+ s3_processor,
+ "S3Processor",
+ core::Relationship("success", "d"),
+ true);
+ plan->addProcessor(
+ "LogAttribute",
+ "LogAttribute",
+ core::Relationship("success", "d"),
+ true);
+ aws_credentials_service = plan->addController("AWSCredentialsService",
"AWSCredentialsService");
+ }
+
+ void setAccesKeyCredentialsInProcessor() {
+ plan->setProperty(update_attribute, "s3.accessKey", "key", true);
+ plan->setProperty(s3_processor, "Access Key", "${s3.accessKey}");
+ plan->setProperty(update_attribute, "s3.secretKey", "secret", true);
+ plan->setProperty(s3_processor, "Secret Key", "${s3.secretKey}");
+ }
+
+ void setAccessKeyCredentialsInController() {
+ plan->setProperty(aws_credentials_service, "Access Key", "key");
+ plan->setProperty(aws_credentials_service, "Secret Key", "secret");
+ }
+
+ template<typename Component>
+ void setCredentialFile(const Component &component) {
+ char in_dir[] = "/tmp/gt.XXXXXX";
+ auto temp_path = test_controller.createTempDirectory(in_dir);
+ REQUIRE(!temp_path.empty());
+ std::string aws_credentials_file(temp_path +
utils::file::FileUtils::get_separator() + "aws_creds.conf");
+ std::ofstream aws_credentials_file_stream(aws_credentials_file);
+ aws_credentials_file_stream << "accessKey=key" << std::endl;
+ aws_credentials_file_stream << "secretKey=secret" << std::endl;
+ aws_credentials_file_stream.close();
+ plan->setProperty(component, "Credentials File", aws_credentials_file);
+ }
+
+ template<typename Component>
+ void setUseDefaultCredentialsChain(const Component &component) {
+ #ifdef WIN32
+ _putenv_s("AWS_ACCESS_KEY_ID", "key");
+ _putenv_s("AWS_SECRET_ACCESS_KEY", "secret");
+ #else
+ setenv("AWS_ACCESS_KEY_ID", "key", 1);
+ setenv("AWS_SECRET_ACCESS_KEY", "secret", 1);
+ #endif
+ plan->setProperty(component, "Use Default Credentials", "true");
+ }
+
+ void setCredentialsService() {
+ plan->setProperty(s3_processor, "AWS Credentials Provider service",
"AWSCredentialsService");
+ }
+
+ void setBucket() {
+ plan->setProperty(update_attribute, "test.bucket", "testBucket", true);
+ plan->setProperty(s3_processor, "Bucket", "${test.bucket}");
+ }
+
+ void setRequiredProperties() {
+ setAccesKeyCredentialsInProcessor();
+ setBucket();
+ }
+
+ void setProxy() {
+ plan->setProperty(update_attribute, "test.proxyHost", "host", true);
+ plan->setProperty(s3_processor, "Proxy Host", "${test.proxyHost}");
+ plan->setProperty(update_attribute, "test.proxyPort", "1234", true);
+ plan->setProperty(s3_processor, "Proxy Port", "${test.proxyPort}");
+ plan->setProperty(update_attribute, "test.proxyUsername", "username",
true);
+ plan->setProperty(s3_processor, "Proxy Username", "${test.proxyUsername}");
+ plan->setProperty(update_attribute, "test.proxyPassword", "password",
true);
+ plan->setProperty(s3_processor, "Proxy Password", "${test.proxyPassword}");
+ }
+
+ void checkProxySettings() {
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost == "host");
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPort == 1234);
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName ==
"username");
+ REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword ==
"password");
+ }
+
+ virtual ~S3TestsFixture() {
+ LogTestController::getInstance().reset();
+ }
+
+ protected:
+ TestController test_controller;
+ std::shared_ptr<TestPlan> plan;
+ MockS3Wrapper* mock_s3_wrapper_ptr;
+ std::shared_ptr<core::Processor> s3_processor;
+ std::shared_ptr<core::Processor> get_file;
+ std::shared_ptr<core::Processor> update_attribute;
+ std::shared_ptr<core::controller::ControllerServiceNode>
aws_credentials_service;
+};