This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 07781b8  MINIFICPP-1390 Create DeleteS3Object processor
07781b8 is described below

commit 07781b8f017e9d6f6de5e4857a91f04e5b9e88f8
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Jan 5 13:47:07 2021 +0100

    MINIFICPP-1390 Create DeleteS3Object processor
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #931
---
 PROCESSORS.md                                      |  36 ++-
 README.md                                          |  44 +--
 docker/test/integration/minifi/__init__.py         |  20 ++
 docker/test/integration/minifi/test/__init__.py    |   7 +-
 .../{test_puts3object.py => test_s3.py}            |  55 +++-
 extensions/aws/AWSLoader.h                         |   4 +
 extensions/aws/processors/DeleteS3Object.cpp       |  88 ++++++
 extensions/aws/processors/DeleteS3Object.h         |  81 ++++++
 extensions/aws/processors/PutS3Object.cpp          | 231 +---------------
 extensions/aws/processors/PutS3Object.h            |  83 ++----
 extensions/aws/processors/S3Processor.cpp          | 262 ++++++++++++++++++
 .../processors/{PutS3Object.h => S3Processor.h}    | 111 ++------
 extensions/aws/s3/S3Wrapper.cpp                    |  22 +-
 extensions/aws/s3/S3Wrapper.h                      |   1 +
 extensions/aws/s3/S3WrapperBase.cpp                |  10 +
 extensions/aws/s3/S3WrapperBase.h                  |   4 +
 libminifi/test/aws-tests/DeleteS3ObjectTests.cpp   | 133 +++++++++
 libminifi/test/aws-tests/MockS3Wrapper.h           |  80 ++++++
 libminifi/test/aws-tests/PutS3ObjectTests.cpp      | 301 ++++-----------------
 libminifi/test/aws-tests/S3TestsFixture.h          | 167 ++++++++++++
 20 files changed, 1082 insertions(+), 658 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index f94b7d7..d9ea56e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -10,6 +10,7 @@
 - [CaptureRTSPFrame](#capturertspframe)
 - [CompressContent](#compresscontent)
 - [ConsumeMQTT](#consumemqtt)
+- [DeleteS3Object](#deletes3object)
 - [ExecuteProcess](#executeprocess)
 - [ExecutePythonProcessor](#executepythonprocessor)
 - [ExecuteSQL](#executesql)
@@ -208,6 +209,39 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 |success|FlowFiles that are sent successfully to the destination are 
transferred to this relationship|
 
 
+## DeleteS3Object
+
+### Description
+
+Deletes FlowFiles on an Amazon S3 Bucket. If attempting to delete a file that 
does not exist, FlowFile is routed to success.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Object Key|||The key of the S3 object. If none is given the filename 
attribute will be used by default.<br/>**Supports Expression Language: true**|
+|**Bucket**|||The S3 bucket<br/>**Supports Expression Language: true**|
+|Access Key|||AWS account access key<br/>**Supports Expression Language: 
true**|
+|Secret Key|||AWS account secret key<br/>**Supports Expression Language: 
true**|
+|Credentials File|||Path to a file containing AWS access key and secret key in 
properties file format. Properties used: accessKey and secretKey|
+|AWS Credentials Provider service|||The name of the AWS Credentials Provider 
controller service that is used to obtain AWS credentials.|
+|**Region**|us-west-2|af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-southeast-1<br/>ap-southeast-2<br/>ca-central-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-north-1<br/>eu-south-1<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>me-south-1<br/>sa-east-1<br/>us-east-1<br/>us-east-2<br/>us-gov-east-1<br/>us-gov-west-1<br/>us-west-1<br/>us-west-2|AWS
 Region|
+|**Communications Timeout**|30 sec||Sets the timeout of the communication 
between the AWS server and the client|
+|Endpoint Override URL|||Endpoint URL to use instead of the AWS default 
including scheme, host, port, and path. The AWS libraries select an endpoint 
URL based on the AWS region, but this property overrides the selected endpoint 
URL, allowing use with other S3-compatible endpoints.<br/>**Supports Expression 
Language: true**|
+|Proxy Host|||Proxy host name or IP<br/>**Supports Expression Language: true**|
+|Proxy Port|||The port number of the proxy host<br/>**Supports Expression 
Language: true**|
+|Proxy Username|||Username to set when authenticating against 
proxy<br/>**Supports Expression Language: true**|
+|Proxy Password|||Password to set when authenticating against 
proxy<br/>**Supports Expression Language: true**|
+|Version|||The Version of the Object to delete<br/>**Supports Expression 
Language: true**|
+### Relationships
+
+| Name | Description |
+| - | - |
+|failure|FlowFiles are routed to failure relationship|
+|success|FlowFiles are routed to success relationship|
+
+
 ## ExecuteProcess
 
 ### Description
@@ -964,7 +998,7 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 |AWS Credentials Provider service|||The name of the AWS Credentials Provider 
controller service that is used to obtain AWS credentials.|
 |**Storage 
Class**|Standard|Standard<br/>ReducedRedundancy<br/>StandardIA<br/>OnezoneIA<br/>IntelligentTiering<br/>Glacier<br/>DeepArchive|AWS
 S3 Storage Class|
 
|**Region**|us-west-2|af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-southeast-1<br/>ap-southeast-2<br/>ca-central-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-north-1<br/>eu-south-1<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>me-south-1<br/>sa-east-1<br/>us-east-1<br/>us-east-2<br/>us-gov-east-1<br/>us-gov-west-1<br/>us-west-1<br/>us-west-2|AWS
 Region|
-|**Communications Timeout**|30 sec|||
+|**Communications Timeout**|30 sec||Sets the timeout of the communication 
between the AWS server and the client|
 |FullControl User List|||A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Full Control for an 
object.<br/>**Supports Expression Language: true**|
 |Read Permission User List|||A comma-separated list of Amazon User ID's or 
E-mail addresses that specifies who should have Read Access for an 
object.<br/>**Supports Expression Language: true**|
 |Read ACL User List|||A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to read the Access Control 
List for an object.<br/>**Supports Expression Language: true**|
diff --git a/README.md b/README.md
index 5d04b3f..28d39d1 100644
--- a/README.md
+++ b/README.md
@@ -59,21 +59,21 @@ A subset of the Apache NiFi [Expression 
Language](EXPRESSIONS.md) is supported.
 
 MiNiFi - C++ supports the following C++ processors:
 
-The following table lists the base set of processors.  
+The following table lists the base set of processors.
 
 | Extension Set        | Processors           |
 | ------------- |:-------------|
 | **Base**    | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
 
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](PROCESSORS.md#putfile)<br/>[RetryFlowFile](PROCESSOR
 [...]
 
-The next table outlines CMAKE flags that correspond with MiNiFi extensions. 
Extensions that are enabled by default ( such as CURL ), can be disabled with 
the respective CMAKE flag on the command line. 
+The next table outlines CMAKE flags that correspond with MiNiFi extensions. 
Extensions that are enabled by default ( such as CURL ), can be disabled with 
the respective CMAKE flag on the command line.
 
-Through JNI extensions you can run NiFi processors using NARs. The JNI 
extension set allows you to run these Java processors. MiNiFi C++ will favor 
C++ implementations over Java implements. In the case where a processor is 
implemented in either language, the one in C++ will be selected; however, will 
remain transparent to the consumer. 
+Through JNI extensions you can run NiFi processors using NARs. The JNI 
extension set allows you to run these Java processors. MiNiFi C++ will favor 
C++ implementations over Java implements. In the case where a processor is 
implemented in either language, the one in C++ will be selected; however, will 
remain transparent to the consumer.
 
 
 | Extension Set        | Processors           | CMAKE Flag  |
 | ------------- |:-------------| :-----|
 | Archive Extensions    | 
[ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)
      |   -DBUILD_LIBARCHIVE=ON |
-| AWS | [AWSCredentialsService](CONTROLLERS.md#awsCredentialsService) | 
-DENABLE_AWS=ON  |
+| AWS | 
[AWSCredentialsService](CONTROLLERS.md#awsCredentialsService)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)
 | -DENABLE_AWS=ON  |
 | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp)  | -DDISABLE_CIVET=ON |
 | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp)      |    -DDISABLE_CURL=ON  |
 | GPS | GetGPS      |    -DENABLE_GPS=ON  |
@@ -92,7 +92,7 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 | USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera)     |    
-DENABLE_USB_CAMERA=ON  |
 | Windows Event Log (Windows only) | 
CollectorInitiatedSubscription<br/>ConsumeWindowsEventLog<br/>TailEventLog | 
-DENABLE_WEL=ON |
 
- Please see our [Python guide](extensions/script/README.md) on how to write 
Python processors and use them within MiNiFi C++. 
+ Please see our [Python guide](extensions/script/README.md) on how to write 
Python processors and use them within MiNiFi C++.
 
 ## Caveats
 * We follow semver with regards to API compatibility, but no ABI compatibility 
is provided. See [semver's website](https://semver.org/) for more information
@@ -121,11 +121,11 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 The following utilities are needed to build external projects, when bundled
 versions of LibreSSL, cURL, or zlib are used:
 
-* patch 
-* autoconf 
-* automake 
-* libtool 
-  
+* patch
+* autoconf
+* automake
+* libtool
+
 **NOTE** if Lua support is enabled, then a C++ compiler with support for 
c++-14 must be used. If using GCC, version 6.x
 or greater is recommended.
 
@@ -133,7 +133,7 @@ or greater is recommended.
 
 **NOTE** if Kafka support is enabled, a recent version of a compiler 
supporting C++-11 regexes must be used. GCC versions >= 4.9.x are recommended.
 
-**NOTE** if Expression Language support is enabled, FlexLexer must be in the 
include path and the version must be compatible with the version of flex used 
when generating lexer sources. Lexer source generation is automatically 
performed during CMake builds. To re-generate the sources, remove: 
+**NOTE** if Expression Language support is enabled, FlexLexer must be in the 
include path and the version must be compatible with the version of flex used 
when generating lexer sources. Lexer source generation is automatically 
performed during CMake builds. To re-generate the sources, remove:
 
  * extensions/expression-language/Parser.cpp
  * extensions/expression-language/Parser.hpp
@@ -194,7 +194,7 @@ On all distributions please use -DUSE_SHARED_LIBS=OFF to 
statically link zlib, l
 #### Windows
   Build and Installation has been tested with Windows 10 using Visual Studio 
2017. You can build
   and create an MSI via the CPACK command. This requires the installation of 
the WiX
-  toolset (http://wixtoolset.org/). To do this, open up a prompt into your 
build directory and 
+  toolset (http://wixtoolset.org/). To do this, open up a prompt into your 
build directory and
   type 'cpack' . The CPACK command will automatically generate and provide you 
a path to the distributable
   msi file. See [Windows.md](Windows.md) for more details.
 
@@ -218,7 +218,7 @@ The needed dependencies can be installed with the following 
commands for:
 ##### Yum based Linux Distributions
 
 **NOTE** if a newer compiler is required, such as when Lua support is enabled, 
it is recommended to use a newer compiler
-using a devtools-* package from the Software Collections (SCL). 
+using a devtools-* package from the Software Collections (SCL).
 
 ```
 # ~/Development/code/apache/nifi-minifi-cpp on git:master
@@ -373,11 +373,11 @@ $ # It is recommended that you install bison from source 
as HomeBrew now uses an
     * Extension cannot be installed due to
       version of cmake or other software, or
       incompatibility with other extensions
-    
-    Enter choice [ A - W or 1-4 ] 
+
+    Enter choice [ A - W or 1-4 ]
   ```
 
-- Boostrap now saves state between runs. State will automatically be saved. 
Provide -c or --clear to clear this state. The -i option provides a guided menu 
install with the ability to change 
+- Boostrap now saves state between runs. State will automatically be saved. 
Provide -c or --clear to clear this state. The -i option provides a guided menu 
install with the ability to change
 advanced features.
 
 ### Building
@@ -392,7 +392,7 @@ advanced features.
 
 - Perform a `cmake ..` to generate the project files
   - Optionally disable or enable extensions. Please visit our guide 
[extensions guide](Extensions.md) for flags or our wiki entry on
-    [customizing 
builds](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143)
 for more information on this topic. 
+    [customizing 
builds](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74685143)
 for more information on this topic.
   ```
   # ~/Development/code/apache/nifi-minifi-cpp on git:master
   $ cmake ..
@@ -513,7 +513,7 @@ from the project directory. Further instructions are 
available in the [Snapcraft
 
 ### Configuring
 The 'conf' directory in the root contains a template config.yml document, 
minifi.properties, and minifi-log.properties. Please see our [Configuration 
document](CONFIGURE.md) for details on how to configure agents.
-         
+
 ### Running
 After completing a [build](#building), the application can be run by issuing 
the following from :
 
@@ -521,7 +521,7 @@ After completing a [build](#building), the application can 
be run by issuing the
 
 By default, this will make use of a config.yml located in the conf directory.  
This configuration file location can be altered by adjusting the property 
`nifi.flow.configuration.file` in minifi.properties located in the conf 
directory.
 
-### Stopping  
+### Stopping
 
 MiNiFi can then be stopped by issuing:
 
@@ -537,12 +537,12 @@ MiNiFi can also be installed as a system service using 
minifi.sh with an optiona
 MiNiFi C++ comes with a deployment script. This will build and package minifi. 
Additionally, a file named build_output will be
 created within the build directory that contains a manifest of build artifacts.
 
-    $ deploy.sh <build identifier> 
+    $ deploy.sh <build identifier>
 
 The build identifier will be carried with the deployed binary for the 
configuration you specify. By default all extensions will be built.
 
-On Windows it is suggested that MSI be used for installation. 
-    
+On Windows it is suggested that MSI be used for installation.
+
 ### Extensions
 
 Please see [Extensions.md](Extensions.md) on how to build and run 
conditionally built dependencies and extensions.
diff --git a/docker/test/integration/minifi/__init__.py 
b/docker/test/integration/minifi/__init__.py
index 18efddd..b64d541 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -608,6 +608,26 @@ class PutS3Object(Processor):
                                           },
                                           auto_terminate=['success'])
 
+class DeleteS3Object(Processor):
+    def __init__(self,
+                 proxy_host='',
+                 proxy_port='',
+                 proxy_username='',
+                 proxy_password=''):
+        super(DeleteS3Object, self).__init__('DeleteS3Object',
+                                          properties={
+                                            'Object Key': 'test_object_key',
+                                            'Bucket': 'test_bucket',
+                                            'Access Key': 'test_access_key',
+                                            'Secret Key': 'test_secret',
+                                            'Endpoint Override URL': 
"http://s3-server:9090";,
+                                            'Proxy Host': proxy_host,
+                                            'Proxy Port': proxy_port,
+                                            'Proxy Username': proxy_username,
+                                            'Proxy Password': proxy_password,
+                                          },
+                                          auto_terminate=['success'])
+
 class InputPort(Connectable):
     def __init__(self, name=None, remote_process_group=None):
         super(InputPort, self).__init__(name=name)
diff --git a/docker/test/integration/minifi/test/__init__.py 
b/docker/test/integration/minifi/test/__init__.py
index 6dbef12..bf64472 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -205,7 +205,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
         return url in output and \
             ((output.count("TCP_DENIED/407") != 0 and \
               output.count("TCP_MISS/200") == output.count("TCP_DENIED/407")) 
or \
-             output.count("TCP_DENIED/407") == 0 and "TCP_MISS/200" in output)
+             output.count("TCP_DENIED/407") == 0 and "TCP_MISS" in output)
 
     def check_s3_server_object_data(self):
         s3_mock_dir = subprocess.check_output(["docker", "exec", "s3-server", 
"find", "/tmp/", "-type", "d", "-name", 
"s3mock*"]).decode(sys.stdout.encoding).strip()
@@ -218,6 +218,11 @@ class DockerTestCluster(SingleNodeDockerCluster):
         server_metadata = json.loads(metadata_json)
         return server_metadata["contentType"] == content_type and metadata == 
server_metadata["userMetadata"]
 
+    def is_s3_bucket_empty(self):
+        s3_mock_dir = subprocess.check_output(["docker", "exec", "s3-server", 
"find", "/tmp/", "-type", "d", "-name", 
"s3mock*"]).decode(sys.stdout.encoding).strip()
+        ls_result = subprocess.check_output(["docker", "exec", "s3-server", 
"ls", s3_mock_dir + "/test_bucket/"]).decode(sys.stdout.encoding)
+        return not ls_result
+
     def rm_out_child(self, dir):
         logging.info('Removing %s from output folder', 
os.path.join(self.tmp_test_output_dir, dir))
         shutil.rmtree(os.path.join(self.tmp_test_output_dir, dir))
diff --git a/docker/test/integration/test_puts3object.py 
b/docker/test/integration/test_s3.py
similarity index 54%
rename from docker/test/integration/test_puts3object.py
rename to docker/test/integration/test_s3.py
index 618fe7a..d961003 100644
--- a/docker/test/integration/test_puts3object.py
+++ b/docker/test/integration/test_s3.py
@@ -17,7 +17,7 @@ from minifi import *
 from minifi.test import *
 
 
-def test_puts3object():
+def test_put_s3_object():
     """
     Verify delivery of S3 object to AWS server
     """
@@ -35,7 +35,7 @@ def test_puts3object():
         assert cluster.check_s3_server_object_data()
         assert cluster.check_s3_server_object_metadata()
 
-def test_puts3object_proxy():
+def test_put_s3_object_proxy():
     """
     Verify delivery of S3 object to AWS server through proxy server
     """
@@ -58,3 +58,54 @@ def test_puts3object_proxy():
         assert 
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key";)
         assert cluster.check_s3_server_object_data()
         assert cluster.check_s3_server_object_metadata()
+
+def test_delete_s3_object():
+    """
+    Verify deletion of S3 object
+    """
+    flow = (GetFile('/tmp/input') >> PutS3Object() \
+            >> LogAttribute() \
+            >> DeleteS3Object() \
+            >> PutFile('/tmp/output/success'))
+
+    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
+        cluster.put_test_data('test_data')
+        cluster.deploy_flow(None, engine='s3-server')
+        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
+        assert cluster.check_output(60)
+        assert cluster.is_s3_bucket_empty()
+
+def test_delete_s3_non_existing_object():
+    """
+    Verify deletion of a non-existing S3 object should succeed
+    """
+    flow = (GetFile('/tmp/input')
+            >> DeleteS3Object() \
+            >> PutFile('/tmp/output/success'))
+
+    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
+        cluster.put_test_data('test_data')
+        cluster.deploy_flow(None, engine='s3-server')
+        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
+        assert cluster.check_output(60)
+
+def test_delete_s3_object_proxy():
+    """
+    Verify deletion of S3 object through proxy server
+    """
+    flow = (GetFile('/tmp/input') >> PutS3Object() \
+            >> LogAttribute() \
+            >> DeleteS3Object(proxy_host='http-proxy',
+                              proxy_port='3128',
+                              proxy_username='admin',
+                              proxy_password='test101') \
+            >> PutFile('/tmp/output/success'))
+
+    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
+        cluster.put_test_data('test_data')
+        cluster.deploy_flow(None, engine='s3-server')
+        cluster.deploy_flow(None, engine='http-proxy')
+        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
+        assert cluster.check_output(60)
+        assert cluster.is_s3_bucket_empty()
+        assert 
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key";)
diff --git a/extensions/aws/AWSLoader.h b/extensions/aws/AWSLoader.h
index ed0d488..eb0d684 100644
--- a/extensions/aws/AWSLoader.h
+++ b/extensions/aws/AWSLoader.h
@@ -26,6 +26,7 @@
 #include "utils/StringUtils.h"
 #include "controllerservices/AWSCredentialsService.h"
 #include "processors/PutS3Object.h"
+#include "processors/DeleteS3Object.h"
 
 class AWSObjectFactory : public core::ObjectFactory {
  public:
@@ -51,6 +52,7 @@ class AWSObjectFactory : public core::ObjectFactory {
     std::vector<std::string> class_names;
     class_names.push_back("AWSCredentialsService");
     class_names.push_back("PutS3Object");
+    class_names.push_back("DeleteS3Object");
     return class_names;
   }
 
@@ -59,6 +61,8 @@ class AWSObjectFactory : public core::ObjectFactory {
       return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::aws::controllers::AWSCredentialsService>());
     } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"PutS3Object")) {
       return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::aws::processors::PutS3Object>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"DeleteS3Object")) {
+      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::aws::processors::DeleteS3Object>());
     } else {
       return nullptr;
     }
diff --git a/extensions/aws/processors/DeleteS3Object.cpp 
b/extensions/aws/processors/DeleteS3Object.cpp
new file mode 100644
index 0000000..cbdab68
--- /dev/null
+++ b/extensions/aws/processors/DeleteS3Object.cpp
@@ -0,0 +1,88 @@
+/**
+ * @file DeleteS3Object.cpp
+ * DeleteS3Object class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteS3Object.h"
+
+#include <set>
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const core::Property DeleteS3Object::Version(
+  core::PropertyBuilder::createProperty("Version")
+    ->withDescription("The Version of the Object to delete")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+const core::Relationship DeleteS3Object::Success("success", "FlowFiles are 
routed to success relationship");
+const core::Relationship DeleteS3Object::Failure("failure", "FlowFiles are 
routed to failure relationship");
+
+void DeleteS3Object::initialize() {
+  // Add new supported properties
+  updateSupportedProperties({Version});
+  // Set the supported relationships
+  setSupportedRelationships({Failure, Success});
+}
+
+bool DeleteS3Object::getExpressionLanguageSupportedProperties(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) {
+  if (!S3Processor::getExpressionLanguageSupportedProperties(context, 
flow_file)) {
+    return false;
+  }
+
+  context->getProperty(Version, version_, flow_file);
+  logger_->log_debug("DeleteS3Object: Version [%s]", version_);
+  return true;
+}
+
+void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+  logger_->log_debug("DeleteS3Object onTrigger");
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  if (!getExpressionLanguageSupportedProperties(context, flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (s3_wrapper_->deleteObject(bucket_, object_key_, version_)) {
+    logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'", 
object_key_, bucket_);
+    session->transfer(flow_file, Success);
+  } else {
+    logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'", 
object_key_, bucket_);
+    session->transfer(flow_file, Failure);
+  }
+}
+
+}  // namespace processors
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/processors/DeleteS3Object.h 
b/extensions/aws/processors/DeleteS3Object.h
new file mode 100644
index 0000000..73cf507
--- /dev/null
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -0,0 +1,81 @@
+/**
+ * @file DeleteS3Object.h
+ * DeleteS3Object class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <sstream>
+#include <utility>
+#include <memory>
+#include <string>
+
+#include "S3Processor.h"
+#include "utils/GeneralUtils.h"
+
+template<typename T>
+class S3TestsFixture;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+class DeleteS3Object : public S3Processor {
+ public:
+  static constexpr char const* ProcessorName = "DeleteS3Object";
+
+  // Supported Properties
+  static const core::Property Version;
+
+  // Supported Relationships
+  static const core::Relationship Failure;
+  static const core::Relationship Success;
+
+  explicit DeleteS3Object(std::string name, minifi::utils::Identifier uuid = 
minifi::utils::Identifier())
+    : S3Processor(std::move(name), uuid, 
logging::LoggerFactory<DeleteS3Object>::getLogger()) {
+  }
+
+  ~DeleteS3Object() override = default;
+
+  void initialize() override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
+
+ protected:
+  bool getExpressionLanguageSupportedProperties(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file) override;
+
+ private:
+  friend class ::S3TestsFixture<DeleteS3Object>;
+
+  explicit DeleteS3Object(std::string name, minifi::utils::Identifier uuid, 
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+    : S3Processor(std::move(name), uuid, 
logging::LoggerFactory<DeleteS3Object>::getLogger(), std::move(s3_wrapper)) {
+  }
+
+  std::string version_;
+};
+
+REGISTER_RESOURCE(DeleteS3Object, "This Processor deletes FlowFiles on an 
Amazon S3 Bucket.");
+
+}  // namespace processors
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/processors/PutS3Object.cpp 
b/extensions/aws/processors/PutS3Object.cpp
index 369283d..b106549 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -23,7 +23,7 @@
 #include <string>
 #include <set>
 #include <memory>
-#include <map>
+#include <utility>
 
 #include "AWSCredentialsService.h"
 #include "properties/Properties.h"
@@ -41,25 +41,9 @@ const uint64_t PutS3Object::ReadCallback::MAX_SIZE = 5UL * 
1024UL * 1024UL * 102
 const uint64_t PutS3Object::ReadCallback::BUFFER_SIZE = 4096;
 
 const std::set<std::string> 
PutS3Object::CANNED_ACLS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::CANNED_ACL_MAP));
-const std::set<std::string> PutS3Object::REGIONS({region::AF_SOUTH_1, 
region::AP_EAST_1, region::AP_NORTHEAST_1,
-  region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1, 
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
-  region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1, 
region::EU_CENTRAL_1, region::EU_NORTH_1,
-  region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3, 
region::ME_SOUTH_1, region::SA_EAST_1,
-  region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1, 
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
 const std::set<std::string> 
PutS3Object::STORAGE_CLASSES(minifi::utils::MapUtils::getKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
 const std::set<std::string> 
PutS3Object::SERVER_SIDE_ENCRYPTIONS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
 
-const core::Property PutS3Object::ObjectKey(
-  core::PropertyBuilder::createProperty("Object Key")
-    ->withDescription("The key of the S3 object. If none is given the filename 
attribute will be used by default.")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::Bucket(
-  core::PropertyBuilder::createProperty("Bucket")
-    ->withDescription("The S3 bucket")
-    ->isRequired(true)
-    ->supportsExpressionLanguage(true)
-    ->build());
 const core::Property PutS3Object::ContentType(
   core::PropertyBuilder::createProperty("Content Type")
     ->withDescription("Sets the Content-Type HTTP header indicating the type 
of content stored in "
@@ -69,24 +53,6 @@ const core::Property PutS3Object::ContentType(
     ->supportsExpressionLanguage(true)
     ->withDefaultValue<std::string>("application/octet-stream")
     ->build());
-const core::Property PutS3Object::AccessKey(
-  core::PropertyBuilder::createProperty("Access Key")
-    ->withDescription("AWS account access key")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::SecretKey(
-  core::PropertyBuilder::createProperty("Secret Key")
-    ->withDescription("AWS account secret key")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::CredentialsFile(
-  core::PropertyBuilder::createProperty("Credentials File")
-    ->withDescription("Path to a file containing AWS access key and secret key 
in properties file format. Properties used: accessKey and secretKey")
-    ->build());
-const core::Property PutS3Object::AWSCredentialsProviderService(
-  core::PropertyBuilder::createProperty("AWS Credentials Provider service")
-    ->withDescription("The name of the AWS Credentials Provider controller 
service that is used to obtain AWS credentials.")
-    ->build());
 const core::Property PutS3Object::StorageClass(
   core::PropertyBuilder::createProperty("Storage Class")
     ->isRequired(true)
@@ -94,19 +60,6 @@ const core::Property PutS3Object::StorageClass(
     ->withAllowableValues<std::string>(PutS3Object::STORAGE_CLASSES)
     ->withDescription("AWS S3 Storage Class")
     ->build());
-const core::Property PutS3Object::Region(
-  core::PropertyBuilder::createProperty("Region")
-    ->isRequired(true)
-    ->withDefaultValue<std::string>(region::US_WEST_2)
-    ->withAllowableValues<std::string>(PutS3Object::REGIONS)
-    ->withDescription("AWS Region")
-    ->build());
-const core::Property PutS3Object::CommunicationsTimeout(
-  core::PropertyBuilder::createProperty("Communications Timeout")
-    ->isRequired(true)
-    ->withDefaultValue<core::TimePeriodValue>("30 sec")
-    ->withDescription("")
-    ->build());
 const core::Property PutS3Object::FullControlUserList(
   core::PropertyBuilder::createProperty("FullControl User List")
     ->withDescription("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Full Control for an object.")
@@ -135,14 +88,6 @@ const core::Property PutS3Object::CannedACL(
                       "PublicReadWrite, PublicRead, Private, AwsExecRead; will 
be ignored if any other ACL/permission property is specified.")
     ->supportsExpressionLanguage(true)
     ->build());
-const core::Property PutS3Object::EndpointOverrideURL(
-  core::PropertyBuilder::createProperty("Endpoint Override URL")
-    ->withDescription("Endpoint URL to use instead of the AWS default 
including scheme, host, "
-                      "port, and path. The AWS libraries select an endpoint 
URL based on the AWS "
-                      "region, but this property overrides the selected 
endpoint URL, allowing use "
-                      "with other S3-compatible endpoints.")
-    ->supportsExpressionLanguage(true)
-    ->build());
 const core::Property PutS3Object::ServerSideEncryption(
   core::PropertyBuilder::createProperty("Server Side Encryption")
     ->isRequired(true)
@@ -150,106 +95,16 @@ const core::Property PutS3Object::ServerSideEncryption(
     ->withAllowableValues<std::string>(PutS3Object::SERVER_SIDE_ENCRYPTIONS)
     ->withDescription("Specifies the algorithm used for server side 
encryption.")
     ->build());
-const core::Property PutS3Object::ProxyHost(
-  core::PropertyBuilder::createProperty("Proxy Host")
-    ->withDescription("Proxy host name or IP")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::ProxyPort(
-  core::PropertyBuilder::createProperty("Proxy Port")
-    ->withDescription("The port number of the proxy host")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::ProxyUsername(
-    core::PropertyBuilder::createProperty("Proxy Username")
-    ->withDescription("Username to set when authenticating against proxy")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::ProxyPassword(
-  core::PropertyBuilder::createProperty("Proxy Password")
-    ->withDescription("Password to set when authenticating against proxy")
-    ->supportsExpressionLanguage(true)
-    ->build());
-const core::Property PutS3Object::UseDefaultCredentials(
-    core::PropertyBuilder::createProperty("Use Default Credentials")
-    ->withDescription("If true, uses the Default Credential chain, including 
EC2 instance profiles or roles, environment variables, default user 
credentials, etc.")
-    ->withDefaultValue<bool>(false)
-    ->isRequired(true)
-    ->build());
 
 const core::Relationship PutS3Object::Success("success", "FlowFiles are routed 
to success relationship");
 const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed 
to failure relationship");
 
 void PutS3Object::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(ObjectKey);
-  properties.insert(Bucket);
-  properties.insert(ContentType);
-  properties.insert(AccessKey);
-  properties.insert(SecretKey);
-  properties.insert(CredentialsFile);
-  properties.insert(AWSCredentialsProviderService);
-  properties.insert(StorageClass);
-  properties.insert(Region);
-  properties.insert(CommunicationsTimeout);
-  properties.insert(FullControlUserList);
-  properties.insert(ReadPermissionUserList);
-  properties.insert(ReadACLUserList);
-  properties.insert(WriteACLUserList);
-  properties.insert(CannedACL);
-  properties.insert(EndpointOverrideURL);
-  properties.insert(ServerSideEncryption);
-  properties.insert(ProxyHost);
-  properties.insert(ProxyPort);
-  properties.insert(ProxyUsername);
-  properties.insert(ProxyPassword);
-  properties.insert(UseDefaultCredentials);
-  setSupportedProperties(properties);
+  // Add new supported properties
+  updateSupportedProperties({ContentType, StorageClass, FullControlUserList, 
ReadPermissionUserList,
+    ReadACLUserList, WriteACLUserList, CannedACL, ServerSideEncryption});
   // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Failure);
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
-}
-
-minifi::utils::optional<Aws::Auth::AWSCredentials> 
PutS3Object::getAWSCredentialsFromControllerService(const 
std::shared_ptr<core::ProcessContext> &context) const {
-  std::string service_name;
-  if (context->getProperty(AWSCredentialsProviderService.getName(), 
service_name) && !service_name.empty()) {
-    std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(service_name);
-    if (nullptr != service) {
-      auto aws_credentials_service = 
std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
-      if (aws_credentials_service) {
-        return 
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
-      }
-    }
-  }
-  return minifi::utils::nullopt;
-}
-
-minifi::utils::optional<Aws::Auth::AWSCredentials> 
PutS3Object::getAWSCredentials(
-    const std::shared_ptr<core::ProcessContext> &context,
-    const std::shared_ptr<core::FlowFile> &flow_file) {
-  auto service_cred = getAWSCredentialsFromControllerService(context);
-  if (service_cred) {
-    logger_->log_info("AWS Credentials successfully set from controller 
service");
-    return service_cred.value();
-  }
-
-  std::string access_key;
-  context->getProperty(AccessKey, access_key, flow_file);
-  aws_credentials_provider_.setAccessKey(access_key);
-  std::string secret_key;
-  context->getProperty(SecretKey, secret_key, flow_file);
-  aws_credentials_provider_.setSecretKey(secret_key);
-  std::string credential_file;
-  context->getProperty(CredentialsFile.getName(), credential_file);
-  aws_credentials_provider_.setCredentialsFile(credential_file);
-  bool use_default_credentials = false;
-  context->getProperty(UseDefaultCredentials.getName(), 
use_default_credentials);
-  aws_credentials_provider_.setUseDefaultCredentials(use_default_credentials);
-
-  return aws_credentials_provider_.getAWSCredentials();
+  setSupportedRelationships({Failure, Success});
 }
 
 void PutS3Object::fillUserMetadata(const std::shared_ptr<core::ProcessContext> 
&context) {
@@ -271,28 +126,8 @@ void PutS3Object::fillUserMetadata(const 
std::shared_ptr<core::ProcessContext> &
   logger_->log_debug("PutS3Object: User metadata [%s]", user_metadata_);
 }
 
-bool PutS3Object::setProxy(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
-  aws::s3::ProxyOptions proxy;
-  context->getProperty(ProxyHost, proxy.host, flow_file);
-  std::string port_str;
-  if (context->getProperty(ProxyPort, port_str, flow_file) && 
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
-    logger_->log_error("Proxy port invalid");
-    return false;
-  }
-  context->getProperty(ProxyUsername, proxy.username, flow_file);
-  context->getProperty(ProxyPassword, proxy.password, flow_file);
-  if (!proxy.host.empty()) {
-    s3_wrapper_->setProxy(proxy);
-    logger_->log_info("Proxy for PutS3Object was set.");
-  }
-  return true;
-}
-
 void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  if (!context->getProperty(Bucket.getName(), put_s3_request_params_.bucket) 
|| put_s3_request_params_.bucket.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or 
invalid");
-  }
-  logger_->log_debug("PutS3Object: Bucket [%s]", 
put_s3_request_params_.bucket);
+  S3Processor::onSchedule(context, sessionFactory);
 
   if (!context->getProperty(StorageClass.getName(), 
put_s3_request_params_.storage_class)
       || put_s3_request_params_.storage_class.empty()
@@ -301,21 +136,6 @@ void PutS3Object::onSchedule(const 
std::shared_ptr<core::ProcessContext> &contex
   }
   logger_->log_debug("PutS3Object: Storage Class [%s]", 
put_s3_request_params_.storage_class);
 
-  std::string value;
-  if (!context->getProperty(Region.getName(), value) || value.empty() || 
REGIONS.count(value) == 0) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or 
invalid");
-  }
-  s3_wrapper_->setRegion(value);
-  logger_->log_debug("PutS3Object: Region [%s]", value);
-
-  uint64_t timeout_val;
-  if (context->getProperty(CommunicationsTimeout.getName(), value) && 
!value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
-    s3_wrapper_->setTimeout(timeout_val);
-    logger_->log_debug("PutS3Object: Communications Timeout [%d]", 
timeout_val);
-  } else {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout 
missing or invalid");
-  }
-
   if (!context->getProperty(ServerSideEncryption.getName(), 
put_s3_request_params_.server_side_encryption)
       || put_s3_request_params_.server_side_encryption.empty()
       || 
SERVER_SIDE_ENCRYPTIONS.find(put_s3_request_params_.server_side_encryption) == 
SERVER_SIDE_ENCRYPTIONS.end()) {
@@ -374,41 +194,15 @@ bool PutS3Object::setAccessControl(const 
std::shared_ptr<core::ProcessContext> &
 bool PutS3Object::getExpressionLanguageSupportedProperties(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file) {
-  context->getProperty(ObjectKey, put_s3_request_params_.object_key, 
flow_file);
-  if (put_s3_request_params_.object_key.empty()) {
-    if (!flow_file->getAttribute("filename", 
put_s3_request_params_.object_key) || 
put_s3_request_params_.object_key.empty()) {
-      logger_->log_error("No Object Key is set and default object key 
'filename' attribute could not be found!");
-      return false;
-    }
-  }
-  logger_->log_debug("PutS3Object: Object Key [%s]", 
put_s3_request_params_.object_key);
-  if (!context->getProperty(Bucket, put_s3_request_params_.bucket, flow_file) 
|| put_s3_request_params_.bucket.empty()) {
-    logger_->log_error("Bucket is invalid or empty!", 
put_s3_request_params_.bucket);
+  if (!S3Processor::getExpressionLanguageSupportedProperties(context, 
flow_file)) {
     return false;
   }
-  logger_->log_debug("PutS3Object: Bucket [%s]", 
put_s3_request_params_.bucket);
+  put_s3_request_params_.object_key = std::move(object_key_);
+  put_s3_request_params_.bucket = std::move(bucket_);
 
   context->getProperty(ContentType, put_s3_request_params_.content_type, 
flow_file);
   logger_->log_debug("PutS3Object: Content Type [%s]", 
put_s3_request_params_.content_type);
 
-  auto credentials = getAWSCredentials(context, flow_file);
-  if (!credentials) {
-    logger_->log_error("AWS Credentials have not been set!");
-    return false;
-  }
-  s3_wrapper_->setCredentials(credentials.value());
-
-  if (!setProxy(context, flow_file)) {
-    context->yield();
-    return false;
-  }
-
-  std::string value;
-  if (context->getProperty(EndpointOverrideURL, value, flow_file) && 
!value.empty()) {
-    s3_wrapper_->setEndpointOverrideUrl(value);
-    logger_->log_debug("PutS3Object: Endpoint Override URL [%d]", value);
-  }
-
   return setAccessControl(context, flow_file);
 }
 
@@ -441,22 +235,23 @@ void PutS3Object::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context
   logger_->log_debug("PutS3Object onTrigger");
   std::shared_ptr<core::FlowFile> flow_file = session->get();
   if (!flow_file) {
+    context->yield();
     return;
   }
 
   if (!getExpressionLanguageSupportedProperties(context, flow_file)) {
-    context->yield();
+    session->transfer(flow_file, Failure);
     return;
   }
 
   PutS3Object::ReadCallback callback(flow_file->getSize(), 
put_s3_request_params_, s3_wrapper_.get());
   session->read(flow_file, &callback);
   if (callback.result_ == minifi::utils::nullopt) {
-    logger_->log_error("Failed to upload S3 object to bucket %s", 
put_s3_request_params_.bucket);
+    logger_->log_error("Failed to upload S3 object to bucket '%s'", 
put_s3_request_params_.bucket);
     session->transfer(flow_file, Failure);
   } else {
     setAttributes(session, flow_file, callback.result_.value());
-    logger_->log_debug("Successfully uploaded S3 object %s to bucket %s", 
put_s3_request_params_.object_key, put_s3_request_params_.bucket);
+    logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'", 
put_s3_request_params_.object_key, put_s3_request_params_.bucket);
     session->transfer(flow_file, Success);
   }
 }
diff --git a/extensions/aws/processors/PutS3Object.h 
b/extensions/aws/processors/PutS3Object.h
index 5c29b11..2bb340a 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -28,13 +28,11 @@
 #include <set>
 #include <algorithm>
 
-#include "S3Wrapper.h"
-#include "core/Property.h"
-#include "core/Processor.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/OptionalUtils.h"
-#include "AWSCredentialsProvider.h"
+#include "S3Processor.h"
+#include "utils/GeneralUtils.h"
+
+template<typename T>
+class S3TestsFixture;
 
 namespace org {
 namespace apache {
@@ -43,85 +41,34 @@ namespace minifi {
 namespace aws {
 namespace processors {
 
-namespace region {
-
-constexpr const char *AF_SOUTH_1 = "af-south-1";
-constexpr const char *AP_EAST_1 = "ap-east-1";
-constexpr const char *AP_NORTHEAST_1 = "ap-northeast-1";
-constexpr const char *AP_NORTHEAST_2 = "ap-northeast-2";
-constexpr const char *AP_NORTHEAST_3 = "ap-northeast-3";
-constexpr const char *AP_SOUTH_1 = "ap-south-1";
-constexpr const char *AP_SOUTHEAST_1 = "ap-southeast-1";
-constexpr const char *AP_SOUTHEAST_2 = "ap-southeast-2";
-constexpr const char *CA_CENTRAL_1 = "ca-central-1";
-constexpr const char *CN_NORTH_1 = "cn-north-1";
-constexpr const char *CN_NORTHWEST_1 = "cn-northwest-1";
-constexpr const char *EU_CENTRAL_1 = "eu-central-1";
-constexpr const char *EU_NORTH_1 = "eu-north-1";
-constexpr const char *EU_SOUTH_1 = "eu-south-1";
-constexpr const char *EU_WEST_1 = "eu-west-1";
-constexpr const char *EU_WEST_2 = "eu-west-2";
-constexpr const char *EU_WEST_3 = "eu-west-3";
-constexpr const char *ME_SOUTH_1 = "me-south-1";
-constexpr const char *SA_EAST_1 = "sa-east-1";
-constexpr const char *US_EAST_1 = "us-east-1";
-constexpr const char *US_EAST_2 = "us-east-2";
-constexpr const char *US_GOV_EAST_1 = "us-gov-east-1";
-constexpr const char *US_GOV_WEST_1 = "us-gov-west-1";
-constexpr const char *US_WEST_1 = "us-west-1";
-constexpr const char *US_WEST_2 = "us-west-2";
-
-}  // namespace region
-
-class PutS3Object : public core::Processor {
+class PutS3Object : public S3Processor {
  public:
   static constexpr char const* ProcessorName = "PutS3Object";
 
   static const std::set<std::string> CANNED_ACLS;
-  static const std::set<std::string> REGIONS;
   static const std::set<std::string> STORAGE_CLASSES;
   static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
 
   // Supported Properties
-  static const core::Property ObjectKey;
-  static const core::Property Bucket;
   static const core::Property ContentType;
-  static const core::Property AccessKey;
-  static const core::Property SecretKey;
-  static const core::Property CredentialsFile;
-  static const core::Property AWSCredentialsProviderService;
   static const core::Property StorageClass;
   static const core::Property ServerSideEncryption;
-  static const core::Property Region;
-  static const core::Property CommunicationsTimeout;
   static const core::Property FullControlUserList;
   static const core::Property ReadPermissionUserList;
   static const core::Property ReadACLUserList;
   static const core::Property WriteACLUserList;
   static const core::Property CannedACL;
-  static const core::Property EndpointOverrideURL;
-  static const core::Property ProxyHost;
-  static const core::Property ProxyPort;
-  static const core::Property ProxyUsername;
-  static const core::Property ProxyPassword;
-  static const core::Property UseDefaultCredentials;
 
   // Supported Relationships
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
   explicit PutS3Object(std::string name, minifi::utils::Identifier uuid = 
minifi::utils::Identifier())
-      : PutS3Object(name, uuid, 
minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
-  }
-
-  explicit PutS3Object(std::string name, minifi::utils::Identifier uuid, 
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
-      : core::Processor(std::move(name), uuid)
-      , s3_wrapper_(std::move(s3_wrapper)) {
+    : S3Processor(std::move(name), uuid, 
logging::LoggerFactory<PutS3Object>::getLogger()) {
   }
 
   ~PutS3Object() override = default;
 
-  bool supportsDynamicProperties() override { return true; }
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
@@ -169,22 +116,24 @@ class PutS3Object : public core::Processor {
     minifi::utils::optional<minifi::aws::s3::PutObjectResult> result_ = 
minifi::utils::nullopt;
   };
 
+ protected:
+  bool getExpressionLanguageSupportedProperties(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file) override;
+
  private:
-  minifi::utils::optional<Aws::Auth::AWSCredentials> 
getAWSCredentialsFromControllerService(const 
std::shared_ptr<core::ProcessContext> &context) const;
-  minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
+  friend class ::S3TestsFixture<PutS3Object>;
+
+  explicit PutS3Object(std::string name, minifi::utils::Identifier uuid, 
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+    : S3Processor(std::move(name), uuid, 
logging::LoggerFactory<PutS3Object>::getLogger(), std::move(s3_wrapper)) {
+  }
+
   void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
-  bool setProxy(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
-  bool getExpressionLanguageSupportedProperties(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
   std::string parseAccessControlList(const std::string &comma_separated_list) 
const;
   bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::FlowFile> &flow_file);
   bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::FlowFile> &flow_file);
   void setAttributes(const std::shared_ptr<core::ProcessSession> &session,  
const std::shared_ptr<core::FlowFile> &flow_file, const 
minifi::aws::s3::PutObjectResult &put_object_result);
 
-  std::shared_ptr<logging::Logger> 
logger_{logging::LoggerFactory<PutS3Object>::getLogger()};
   aws::s3::PutObjectRequestParameters put_s3_request_params_;
-  std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper_;
   std::string user_metadata_;
-  aws::AWSCredentialsProvider aws_credentials_provider_;
 };
 
 REGISTER_RESOURCE(PutS3Object, "This Processor puts FlowFiles to an Amazon S3 
Bucket.");
diff --git a/extensions/aws/processors/S3Processor.cpp 
b/extensions/aws/processors/S3Processor.cpp
new file mode 100644
index 0000000..1a02b7f
--- /dev/null
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -0,0 +1,262 @@
+/**
+ * @file S3Processor.cpp
+ * Base S3 processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "S3Processor.h"
+
+#include <string>
+#include <set>
+#include <memory>
+
+#include "S3Wrapper.h"
+#include "AWSCredentialsService.h"
+#include "properties/Properties.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::set<std::string> S3Processor::REGIONS({region::AF_SOUTH_1, 
region::AP_EAST_1, region::AP_NORTHEAST_1,
+  region::AP_NORTHEAST_2, region::AP_NORTHEAST_3, region::AP_SOUTH_1, 
region::AP_SOUTHEAST_1, region::AP_SOUTHEAST_2,
+  region::CA_CENTRAL_1, region::CN_NORTH_1, region::CN_NORTHWEST_1, 
region::EU_CENTRAL_1, region::EU_NORTH_1,
+  region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3, 
region::ME_SOUTH_1, region::SA_EAST_1,
+  region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1, 
region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
+
+const core::Property S3Processor::ObjectKey(
+  core::PropertyBuilder::createProperty("Object Key")
+    ->withDescription("The key of the S3 object. If none is given the filename 
attribute will be used by default.")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::Bucket(
+  core::PropertyBuilder::createProperty("Bucket")
+    ->withDescription("The S3 bucket")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::AccessKey(
+  core::PropertyBuilder::createProperty("Access Key")
+    ->withDescription("AWS account access key")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::SecretKey(
+  core::PropertyBuilder::createProperty("Secret Key")
+    ->withDescription("AWS account secret key")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::CredentialsFile(
+  core::PropertyBuilder::createProperty("Credentials File")
+    ->withDescription("Path to a file containing AWS access key and secret key 
in properties file format. Properties used: accessKey and secretKey")
+    ->build());
+const core::Property S3Processor::AWSCredentialsProviderService(
+  core::PropertyBuilder::createProperty("AWS Credentials Provider service")
+    ->withDescription("The name of the AWS Credentials Provider controller 
service that is used to obtain AWS credentials.")
+    ->build());
+const core::Property S3Processor::Region(
+  core::PropertyBuilder::createProperty("Region")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(region::US_WEST_2)
+    ->withAllowableValues<std::string>(S3Processor::REGIONS)
+    ->withDescription("AWS Region")
+    ->build());
+const core::Property S3Processor::CommunicationsTimeout(
+  core::PropertyBuilder::createProperty("Communications Timeout")
+    ->isRequired(true)
+    ->withDefaultValue<core::TimePeriodValue>("30 sec")
+    ->withDescription("Sets the timeout of the communication between the AWS 
server and the client")
+    ->build());
+const core::Property S3Processor::EndpointOverrideURL(
+  core::PropertyBuilder::createProperty("Endpoint Override URL")
+    ->withDescription("Endpoint URL to use instead of the AWS default 
including scheme, host, "
+                      "port, and path. The AWS libraries select an endpoint 
URL based on the AWS "
+                      "region, but this property overrides the selected 
endpoint URL, allowing use "
+                      "with other S3-compatible endpoints.")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::ProxyHost(
+  core::PropertyBuilder::createProperty("Proxy Host")
+    ->withDescription("Proxy host name or IP")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::ProxyPort(
+  core::PropertyBuilder::createProperty("Proxy Port")
+    ->withDescription("The port number of the proxy host")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::ProxyUsername(
+    core::PropertyBuilder::createProperty("Proxy Username")
+    ->withDescription("Username to set when authenticating against proxy")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::ProxyPassword(
+  core::PropertyBuilder::createProperty("Proxy Password")
+    ->withDescription("Password to set when authenticating against proxy")
+    ->supportsExpressionLanguage(true)
+    ->build());
+const core::Property S3Processor::UseDefaultCredentials(
+    core::PropertyBuilder::createProperty("Use Default Credentials")
+    ->withDescription("If true, uses the Default Credential chain, including 
EC2 instance profiles or roles, environment variables, default user 
credentials, etc.")
+    ->withDefaultValue<bool>(false)
+    ->isRequired(true)
+    ->build());
+
+S3Processor::S3Processor(std::string name, minifi::utils::Identifier uuid, 
const std::shared_ptr<logging::Logger> &logger)
+  : core::Processor(std::move(name), uuid)
+  , logger_(logger)
+  , s3_wrapper_(minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
+  setSupportedProperties({ObjectKey, Bucket, AccessKey, SecretKey, 
CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, 
CommunicationsTimeout,
+                          EndpointOverrideURL, ProxyHost, ProxyPort, 
ProxyUsername, ProxyPassword, UseDefaultCredentials});
+}
+
+S3Processor::S3Processor(std::string name, minifi::utils::Identifier uuid, 
const std::shared_ptr<logging::Logger> &logger, 
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
+  : core::Processor(std::move(name), uuid)
+  , logger_(logger)
+  , s3_wrapper_(std::move(s3_wrapper)) {
+  setSupportedProperties({ObjectKey, Bucket, AccessKey, SecretKey, 
CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, 
CommunicationsTimeout,
+                          EndpointOverrideURL, ProxyHost, ProxyPort, 
ProxyUsername, ProxyPassword, UseDefaultCredentials});
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> 
S3Processor::getAWSCredentialsFromControllerService(const 
std::shared_ptr<core::ProcessContext> &context) const {
+  std::string service_name;
+  if (!context->getProperty(AWSCredentialsProviderService.getName(), 
service_name) || service_name.empty()) {
+    return minifi::utils::nullopt;
+  }
+
+  std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(service_name);
+  if (!service) {
+    return minifi::utils::nullopt;
+  }
+
+  auto aws_credentials_service = 
std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
+  if (!aws_credentials_service) {
+    return minifi::utils::nullopt;
+  }
+
+  return 
minifi::utils::make_optional<Aws::Auth::AWSCredentials>(aws_credentials_service->getAWSCredentials());
+}
+
+minifi::utils::optional<Aws::Auth::AWSCredentials> 
S3Processor::getAWSCredentials(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) {
+  auto service_cred = getAWSCredentialsFromControllerService(context);
+  if (service_cred) {
+    logger_->log_info("AWS Credentials successfully set from controller 
service");
+    return service_cred.value();
+  }
+
+  std::string access_key;
+  context->getProperty(AccessKey, access_key, flow_file);
+  aws_credentials_provider_.setAccessKey(access_key);
+  std::string secret_key;
+  context->getProperty(SecretKey, secret_key, flow_file);
+  aws_credentials_provider_.setSecretKey(secret_key);
+  std::string credential_file;
+  context->getProperty(CredentialsFile.getName(), credential_file);
+  aws_credentials_provider_.setCredentialsFile(credential_file);
+  bool use_default_credentials = false;
+  context->getProperty(UseDefaultCredentials.getName(), 
use_default_credentials);
+  aws_credentials_provider_.setUseDefaultCredentials(use_default_credentials);
+
+  return aws_credentials_provider_.getAWSCredentials();
+}
+
+bool S3Processor::setProxy(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::FlowFile> &flow_file) {
+  aws::s3::ProxyOptions proxy;
+  context->getProperty(ProxyHost, proxy.host, flow_file);
+  std::string port_str;
+  if (context->getProperty(ProxyPort, port_str, flow_file) && 
!port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
+    logger_->log_error("Proxy port invalid");
+    return false;
+  }
+  context->getProperty(ProxyUsername, proxy.username, flow_file);
+  context->getProperty(ProxyPassword, proxy.password, flow_file);
+  if (!proxy.host.empty()) {
+    s3_wrapper_->setProxy(proxy);
+    logger_->log_info("Proxy for S3Processor was set.");
+  }
+  return true;
+}
+
+void S3Processor::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  if (!context->getProperty(Bucket.getName(), bucket_) || bucket_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or 
invalid");
+  }
+  logger_->log_debug("S3Processor: Bucket [%s]", bucket_);
+
+  std::string value;
+  if (!context->getProperty(Region.getName(), value) || value.empty() || 
REGIONS.count(value) == 0) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or 
invalid");
+  }
+  s3_wrapper_->setRegion(value);
+  logger_->log_debug("S3Processor: Region [%s]", value);
+
+  uint64_t timeout_val;
+  if (context->getProperty(CommunicationsTimeout.getName(), value) && 
!value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
+    s3_wrapper_->setTimeout(timeout_val);
+    logger_->log_debug("S3Processor: Communications Timeout [%d]", 
timeout_val);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout 
missing or invalid");
+  }
+}
+
+bool S3Processor::getExpressionLanguageSupportedProperties(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) {
+  context->getProperty(ObjectKey, object_key_, flow_file);
+  if (object_key_.empty() && (!flow_file->getAttribute("filename", 
object_key_) || object_key_.empty())) {
+    logger_->log_error("No Object Key is set and default object key 'filename' 
attribute could not be found!");
+    return false;
+  }
+  logger_->log_debug("S3Processor: Object Key [%s]", object_key_);
+
+  if (!context->getProperty(Bucket, bucket_, flow_file) || bucket_.empty()) {
+    logger_->log_error("Bucket '%s' is invalid or empty!", bucket_);
+    return false;
+  }
+  logger_->log_debug("S3Processor: Bucket [%s]", bucket_);
+
+  auto credentials = getAWSCredentials(context, flow_file);
+  if (!credentials) {
+    logger_->log_error("AWS Credentials have not been set!");
+    return false;
+  }
+  s3_wrapper_->setCredentials(credentials.value());
+
+  if (!setProxy(context, flow_file)) {
+    return false;
+  }
+
+  std::string value;
+  if (context->getProperty(EndpointOverrideURL, value, flow_file) && 
!value.empty()) {
+    s3_wrapper_->setEndpointOverrideUrl(value);
+    logger_->log_debug("S3Processor: Endpoint Override URL [%d]", value);
+  }
+
+  return true;
+}
+
+}  // namespace processors
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/processors/PutS3Object.h 
b/extensions/aws/processors/S3Processor.h
similarity index 50%
copy from extensions/aws/processors/PutS3Object.h
copy to extensions/aws/processors/S3Processor.h
index 5c29b11..a5cb422 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/S3Processor.h
@@ -1,6 +1,6 @@
 /**
- * @file PutS3Object.h
- * PutS3Object class declaration
+ * @file S3Processor.h
+ * Base S3 processor class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,21 +20,20 @@
 
 #pragma once
 
-#include <sstream>
 #include <utility>
-#include <vector>
 #include <memory>
 #include <string>
 #include <set>
-#include <algorithm>
 
-#include "S3Wrapper.h"
+#include "aws/core/auth/AWSCredentialsProvider.h"
+
+#include "S3WrapperBase.h"
+#include "AWSCredentialsProvider.h"
 #include "core/Property.h"
 #include "core/Processor.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/OptionalUtils.h"
-#include "AWSCredentialsProvider.h"
 
 namespace org {
 namespace apache {
@@ -73,32 +72,19 @@ constexpr const char *US_WEST_2 = "us-west-2";
 
 }  // namespace region
 
-class PutS3Object : public core::Processor {
+class S3Processor : public core::Processor {
  public:
-  static constexpr char const* ProcessorName = "PutS3Object";
-
-  static const std::set<std::string> CANNED_ACLS;
   static const std::set<std::string> REGIONS;
-  static const std::set<std::string> STORAGE_CLASSES;
-  static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
 
   // Supported Properties
   static const core::Property ObjectKey;
   static const core::Property Bucket;
-  static const core::Property ContentType;
   static const core::Property AccessKey;
   static const core::Property SecretKey;
   static const core::Property CredentialsFile;
   static const core::Property AWSCredentialsProviderService;
-  static const core::Property StorageClass;
-  static const core::Property ServerSideEncryption;
   static const core::Property Region;
   static const core::Property CommunicationsTimeout;
-  static const core::Property FullControlUserList;
-  static const core::Property ReadPermissionUserList;
-  static const core::Property ReadACLUserList;
-  static const core::Property WriteACLUserList;
-  static const core::Property CannedACL;
   static const core::Property EndpointOverrideURL;
   static const core::Property ProxyHost;
   static const core::Property ProxyPort;
@@ -106,89 +92,26 @@ class PutS3Object : public core::Processor {
   static const core::Property ProxyPassword;
   static const core::Property UseDefaultCredentials;
 
-  // Supported Relationships
-  static const core::Relationship Failure;
-  static const core::Relationship Success;
-
-  explicit PutS3Object(std::string name, minifi::utils::Identifier uuid = 
minifi::utils::Identifier())
-      : PutS3Object(name, uuid, 
minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
-  }
-
-  explicit PutS3Object(std::string name, minifi::utils::Identifier uuid, 
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
-      : core::Processor(std::move(name), uuid)
-      , s3_wrapper_(std::move(s3_wrapper)) {
-  }
-
-  ~PutS3Object() override = default;
+  explicit S3Processor(std::string name, minifi::utils::Identifier uuid, const 
std::shared_ptr<logging::Logger> &logger);
 
   bool supportsDynamicProperties() override { return true; }
-  void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
-
-  class ReadCallback : public InputStreamCallback {
-   public:
-    static const uint64_t MAX_SIZE;
-    static const uint64_t BUFFER_SIZE;
-
-    ReadCallback(uint64_t flow_size, const 
minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3WrapperBase* 
s3_wrapper)
-      : flow_size_(flow_size)
-      , options_(options)
-      , s3_wrapper_(s3_wrapper) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (flow_size_ > MAX_SIZE) {
-        return -1;
-      }
-      std::vector<uint8_t> buffer;
-      auto data_stream = std::make_shared<std::stringstream>();
-      buffer.reserve(BUFFER_SIZE);
-      read_size_ = 0;
-      while (read_size_ < flow_size_) {
-        auto next_read_size = std::min(flow_size_ - read_size_, BUFFER_SIZE);
-        int read_ret = stream->read(buffer.data(), next_read_size);
-        if (read_ret < 0) {
-          return -1;
-        }
-        if (read_ret > 0) {
-          data_stream->write(reinterpret_cast<char*>(buffer.data()), 
next_read_size);
-          read_size_ += read_ret;
-        } else {
-          break;
-        }
-      }
-      result_ = s3_wrapper_->putObject(options_, data_stream);
-      return read_size_;
-    }
-
-    uint64_t flow_size_;
-    const minifi::aws::s3::PutObjectRequestParameters& options_;
-    aws::s3::S3WrapperBase* s3_wrapper_;
-    uint64_t read_size_ = 0;
-    minifi::utils::optional<minifi::aws::s3::PutObjectResult> result_ = 
minifi::utils::nullopt;
-  };
-
- private:
+
+ protected:
+  explicit S3Processor(std::string name, minifi::utils::Identifier uuid, const 
std::shared_ptr<logging::Logger> &logger, 
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper);
+
   minifi::utils::optional<Aws::Auth::AWSCredentials> 
getAWSCredentialsFromControllerService(const 
std::shared_ptr<core::ProcessContext> &context) const;
   minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
-  void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
   bool setProxy(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
-  bool getExpressionLanguageSupportedProperties(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
-  std::string parseAccessControlList(const std::string &comma_separated_list) 
const;
-  bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::FlowFile> &flow_file);
-  bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::FlowFile> &flow_file);
-  void setAttributes(const std::shared_ptr<core::ProcessSession> &session,  
const std::shared_ptr<core::FlowFile> &flow_file, const 
minifi::aws::s3::PutObjectResult &put_object_result);
-
-  std::shared_ptr<logging::Logger> 
logger_{logging::LoggerFactory<PutS3Object>::getLogger()};
-  aws::s3::PutObjectRequestParameters put_s3_request_params_;
+  virtual bool getExpressionLanguageSupportedProperties(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::FlowFile> &flow_file);
+
+  std::shared_ptr<logging::Logger> logger_;
   std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper_;
-  std::string user_metadata_;
+  std::string bucket_;
+  std::string object_key_;
   aws::AWSCredentialsProvider aws_credentials_provider_;
 };
 
-REGISTER_RESOURCE(PutS3Object, "This Processor puts FlowFiles to an Amazon S3 
Bucket.");
-
 }  // namespace processors
 }  // namespace aws
 }  // namespace minifi
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index 825f34a..8056e28 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -35,11 +35,27 @@ minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
S3Wrapper::sendPutObjec
   Aws::S3::Model::PutObjectOutcome outcome = s3_client.PutObject(request);
 
   if (outcome.IsSuccess()) {
-      logger_->log_info("Added S3 object %s to bucket %s", request.GetKey(), 
request.GetBucket());
+      logger_->log_info("Added S3 object '%s' to bucket '%s'", 
request.GetKey(), request.GetBucket());
       return outcome.GetResultWithOwnership();
   } else {
-      logger_->log_error("PutS3Object failed with the following: '%s'", 
outcome.GetError().GetMessage());
-      return minifi::utils::nullopt;
+    logger_->log_error("PutS3Object failed with the following: '%s'", 
outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+bool S3Wrapper::sendDeleteObjectRequest(const 
Aws::S3::Model::DeleteObjectRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  Aws::S3::Model::DeleteObjectOutcome outcome = 
s3_client.DeleteObject(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_info("Deleted S3 object '%s' from bucket '%s'", 
request.GetKey(), request.GetBucket());
+    return true;
+  } else if (outcome.GetError().GetErrorType() == 
Aws::S3::S3Errors::NO_SUCH_KEY) {
+    logger_->log_info("S3 object '%s' was not found in bucket '%s'", 
request.GetKey(), request.GetBucket());
+    return true;
+  } else {
+    logger_->log_error("DeleteS3Object failed with the following: '%s'", 
outcome.GetError().GetMessage());
+    return false;
   }
 }
 
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index 3302122..323744e 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -34,6 +34,7 @@ namespace s3 {
 class S3Wrapper : public S3WrapperBase {
  protected:
   minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override;
+  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& 
request) override;
 };
 
 }  // namespace s3
diff --git a/extensions/aws/s3/S3WrapperBase.cpp 
b/extensions/aws/s3/S3WrapperBase.cpp
index 9cf2fad..3468834 100644
--- a/extensions/aws/s3/S3WrapperBase.cpp
+++ b/extensions/aws/s3/S3WrapperBase.cpp
@@ -124,6 +124,16 @@ minifi::utils::optional<PutObjectResult> 
S3WrapperBase::putObject(const PutObjec
   }
 }
 
+bool S3WrapperBase::deleteObject(const std::string& bucket, const std::string& 
object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+    request.SetVersionId(version);
+  }
+  return sendDeleteObjectRequest(request);
+}
+
 }  // namespace s3
 }  // namespace aws
 }  // namespace minifi
diff --git a/extensions/aws/s3/S3WrapperBase.h 
b/extensions/aws/s3/S3WrapperBase.h
index 8204299..2416792 100644
--- a/extensions/aws/s3/S3WrapperBase.h
+++ b/extensions/aws/s3/S3WrapperBase.h
@@ -28,6 +28,8 @@
 #include "aws/core/auth/AWSCredentialsProvider.h"
 #include "aws/s3/S3Client.h"
 #include "aws/s3/model/PutObjectRequest.h"
+#include "aws/s3/model/PutObjectResult.h"
+#include "aws/s3/model/DeleteObjectRequest.h"
 #include "aws/s3/model/StorageClass.h"
 #include "aws/s3/model/ServerSideEncryption.h"
 #include "aws/s3/model/ObjectCannedACL.h"
@@ -107,11 +109,13 @@ class S3WrapperBase {
   void setProxy(const ProxyOptions& proxy);
 
   minifi::utils::optional<PutObjectResult> putObject(const 
PutObjectRequestParameters& options, std::shared_ptr<Aws::IOStream> 
data_stream);
+  bool deleteObject(const std::string& bucket, const std::string& object_key, 
const std::string& version = "");
 
   virtual ~S3WrapperBase() = default;
 
  protected:
   virtual minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) = 0;
+  virtual bool sendDeleteObjectRequest(const 
Aws::S3::Model::DeleteObjectRequest& request) = 0;
   void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const;
   static std::string getExpiryDate(const std::string& expiration);
   static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption 
encryption);
diff --git a/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp 
b/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
new file mode 100644
index 0000000..6255e2e
--- /dev/null
+++ b/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "S3TestsFixture.h"
+#include "processors/DeleteS3Object.h"
+
+using DeleteS3ObjectTestsFixture = 
S3TestsFixture<minifi::aws::processors::DeleteS3Object>;
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test AWS credential setting", 
"[awsCredentials]") {
+  setBucket();
+
+  SECTION("Test property credentials") {
+    setAccesKeyCredentialsInProcessor();
+  }
+
+  SECTION("Test credentials setting from AWS Credentials service") {
+    setAccessKeyCredentialsInController();
+    setCredentialsService();
+  }
+
+  SECTION("Test credentials file setting") {
+    setCredentialFile(s3_processor);
+  }
+
+  SECTION("Test credentials file setting from AWS Credentials service") {
+    setCredentialFile(aws_credentials_service);
+    setCredentialsService();
+  }
+
+  SECTION("Test credentials setting using default credential chain") {
+    setUseDefaultCredentialsChain(s3_processor);
+  }
+
+  SECTION("Test credentials setting from AWS Credentials service using default 
credential chain") {
+    setUseDefaultCredentialsChain(aws_credentials_service);
+    setCredentialsService();
+  }
+
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSSecretKey() == "secret");
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test required property not set", 
"[awsS3Config]") {
+  SECTION("Test credentials not set") {
+  }
+
+  SECTION("Test no bucket is set") {
+    setAccesKeyCredentialsInProcessor();
+  }
+
+  SECTION("Test no object key is set") {
+    setRequiredProperties();
+    plan->setProperty(update_attribute, "filename", "", true);
+  }
+
+  SECTION("Test region is empty") {
+    setRequiredProperties();
+    plan->setProperty(s3_processor, "Region", "");
+  }
+
+  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test proxy setting", 
"[awsS3Proxy]") {
+  setRequiredProperties();
+  setProxy();
+  test_controller.runSession(plan, true);
+  checkProxySettings();
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test success case with default 
values", "[awsS3DeleteSuccess]") {
+  setRequiredProperties();
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetBucket() == 
"testBucket");
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetKey() == 
INPUT_FILENAME);
+  REQUIRE(!mock_s3_wrapper_ptr->delete_object_request.VersionIdHasBeenSet());
+  REQUIRE(LogTestController::getInstance().contains("Successfully deleted S3 
object"));
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test version setting", 
"[awsS3DeleteWithVersion]") {
+  setRequiredProperties();
+  plan->setProperty(update_attribute, "s3.version", "v1", true);
+  plan->setProperty(s3_processor, "Version", "${s3.version}");
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetVersionId() == "v1");
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.VersionIdHasBeenSet());
+  REQUIRE(LogTestController::getInstance().contains("Successfully deleted S3 
object"));
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test optional client 
configuration values", "[awsS3DeleteOptionalClientConfig]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Region", 
minifi::aws::processors::region::US_EAST_1);
+  plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
+  plan->setProperty(update_attribute, "test.endpoint", 
"http://localhost:1234";, true);
+  plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == 
minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride == 
"http://localhost:1234";);
+}
+
+TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test failure case", 
"[awsS3DeleteFailure]") {
+  auto log_failure = plan->addProcessor(
+      "LogAttribute",
+      "LogFailure",
+      core::Relationship("failure", "d"));
+  plan->addConnection(s3_processor, core::Relationship("failure", "d"), 
log_failure);
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Version", "v1");
+  log_failure->setAutoTerminatedRelationships({{core::Relationship("success", 
"d")}});
+  mock_s3_wrapper_ptr->setDeleteObjectResult(false);
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetBucket() == 
"testBucket");
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetKey() == 
INPUT_FILENAME);
+  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetVersionId() == "v1");
+  REQUIRE(LogTestController::getInstance().contains("Failed to delete S3 
object"));
+}
diff --git a/libminifi/test/aws-tests/MockS3Wrapper.h 
b/libminifi/test/aws-tests/MockS3Wrapper.h
new file mode 100644
index 0000000..6958938
--- /dev/null
+++ b/libminifi/test/aws-tests/MockS3Wrapper.h
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "s3/S3WrapperBase.h"
+
+const std::string S3_VERSION = "1.2.3";
+const std::string S3_ETAG = "\"tag-123\"";
+const std::string S3_ETAG_UNQUOTED = "tag-123";
+const std::string S3_EXPIRATION = "expiry-date=\"Wed, 28 Oct 2020 00:00:00 
GMT\", rule-id=\"my_expiration_rule\"";
+const std::string S3_EXPIRATION_DATE = "Wed, 28 Oct 2020 00:00:00 GMT";
+const Aws::S3::Model::ServerSideEncryption S3_SSEALGORITHM = 
Aws::S3::Model::ServerSideEncryption::aws_kms;
+const std::string S3_SSEALGORITHM_STR = "aws_kms";
+
+class MockS3Wrapper : public minifi::aws::s3::S3WrapperBase {
+ public:
+  minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
+    put_object_request = request;
+
+    Aws::S3::Model::PutObjectResult put_s3_result;
+    if (!return_empty_result_) {
+      put_s3_result.SetVersionId(S3_VERSION);
+      put_s3_result.SetETag(S3_ETAG);
+      put_s3_result.SetExpiration(S3_EXPIRATION);
+      put_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
+    }
+    return put_s3_result;
+  }
+
+  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& 
request) override {
+    delete_object_request = request;
+    return delete_object_result_;
+  }
+
+  Aws::Auth::AWSCredentials getCredentials() const {
+    return credentials_;
+  }
+
+  Aws::Client::ClientConfiguration getClientConfig() const {
+    return client_config_;
+  }
+
+  std::string getPutObjectRequestBody() const {
+    std::istreambuf_iterator<char> buf_it;
+    return 
std::string(std::istreambuf_iterator<char>(*put_object_request.GetBody()), 
buf_it);
+  }
+
+  void returnEmptyS3Result(bool return_empty_result = true) {
+    return_empty_result_ = return_empty_result;
+  }
+
+  void setDeleteObjectResult(bool delete_object_result) {
+    delete_object_result_ = delete_object_result;
+  }
+
+  Aws::S3::Model::PutObjectRequest put_object_request;
+  Aws::S3::Model::DeleteObjectRequest delete_object_request;
+
+ private:
+  bool delete_object_result_ = true;
+  bool return_empty_result_ = false;
+};
diff --git a/libminifi/test/aws-tests/PutS3ObjectTests.cpp 
b/libminifi/test/aws-tests/PutS3ObjectTests.cpp
index fbf47e8..1bc3030 100644
--- a/libminifi/test/aws-tests/PutS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/PutS3ObjectTests.cpp
@@ -16,142 +16,11 @@
  * limitations under the License.
  */
 
-#include <stdlib.h>
-#include <iostream>
-#include <map>
-
-#include "core/Processor.h"
-#include "../TestBase.h"
+#include "S3TestsFixture.h"
 #include "processors/PutS3Object.h"
-#include "processors/GetFile.h"
-#include "processors/LogAttribute.h"
-#include "processors/UpdateAttribute.h"
-#include "s3/S3WrapperBase.h"
-#include "utils/file/FileUtils.h"
-
-const std::string S3_VERSION = "1.2.3";
-const std::string S3_ETAG = "\"tag-123\"";
-const std::string S3_ETAG_UNQUOTED = "tag-123";
-const std::string S3_EXPIRATION = "expiry-date=\"Wed, 28 Oct 2020 00:00:00 
GMT\", rule-id=\"my_expiration_rule\"";
-const std::string S3_EXPIRATION_DATE = "Wed, 28 Oct 2020 00:00:00 GMT";
-const Aws::S3::Model::ServerSideEncryption S3_SSEALGORITHM = 
Aws::S3::Model::ServerSideEncryption::aws_kms;
-const std::string S3_SSEALGORITHM_STR = "aws_kms";
 
-class MockS3Wrapper : public minifi::aws::s3::S3WrapperBase {
+class PutS3ObjectTestsFixture : public 
S3TestsFixture<minifi::aws::processors::PutS3Object> {
  public:
-  Aws::Auth::AWSCredentials getCredentials() const {
-    return credentials_;
-  }
-
-  Aws::Client::ClientConfiguration getClientConfig() const {
-    return client_config_;
-  }
-
-  minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
-    std::istreambuf_iterator<char> buf_it;
-    put_s3_data = 
std::string(std::istreambuf_iterator<char>(*request.GetBody()), buf_it);
-    bucket_name = request.GetBucket();
-    object_key = request.GetKey();
-    storage_class = request.GetStorageClass();
-    server_side_encryption = request.GetServerSideEncryption();
-    metadata_map = request.GetMetadata();
-    content_type = request.GetContentType();
-    fullcontrol_user_list = request.GetGrantFullControl();
-    read_user_list = request.GetGrantRead();
-    read_acl_user_list = request.GetGrantReadACP();
-    write_acl_user_list = request.GetGrantWriteACP();
-    write_acl_user_list = request.GetGrantWriteACP();
-    canned_acl = request.GetACL();
-
-    if (!get_empty_result) {
-      put_s3_result.SetVersionId(S3_VERSION);
-      put_s3_result.SetETag(S3_ETAG);
-      put_s3_result.SetExpiration(S3_EXPIRATION);
-      put_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
-    }
-    return put_s3_result;
-  }
-
-  std::string bucket_name;
-  std::string object_key;
-  Aws::S3::Model::StorageClass storage_class;
-  Aws::S3::Model::ServerSideEncryption server_side_encryption;
-  Aws::S3::Model::PutObjectResult put_s3_result;
-  std::string put_s3_data;
-  std::map<std::string, std::string> metadata_map;
-  std::string content_type;
-  std::string fullcontrol_user_list;
-  std::string read_user_list;
-  std::string read_acl_user_list;
-  std::string write_acl_user_list;
-  Aws::S3::Model::ObjectCannedACL canned_acl;
-  bool get_empty_result = false;
-};
-
-class PutS3ObjectTestsFixture {
- public:
-  PutS3ObjectTestsFixture() {
-    // Disable retrieving AWS metadata for tests
-    #ifdef WIN32
-    _putenv_s("AWS_EC2_METADATA_DISABLED", "true");
-    #else
-    setenv("AWS_EC2_METADATA_DISABLED", "true", 1);
-    #endif
-
-    LogTestController::getInstance().setDebug<TestPlan>();
-    LogTestController::getInstance().setDebug<minifi::core::Processor>();
-    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
-    LogTestController::getInstance().setTrace<processors::GetFile>();
-    LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
-    LogTestController::getInstance().setDebug<processors::LogAttribute>();
-    
LogTestController::getInstance().setTrace<minifi::aws::processors::PutS3Object>();
-
-    // Build MiNiFi processing graph
-    plan = test_controller.createPlan();
-    mock_s3_wrapper_ptr = new MockS3Wrapper();
-    std::unique_ptr<minifi::aws::s3::S3WrapperBase> 
mock_s3_wrapper(mock_s3_wrapper_ptr);
-    put_s3_object = 
std::make_shared<minifi::aws::processors::PutS3Object>("PutS3Object", 
utils::Identifier(), std::move(mock_s3_wrapper));
-
-    char input_dir_mask[] = "/tmp/gt.XXXXXX";
-    auto input_dir = test_controller.createTempDirectory(input_dir_mask);
-    std::ofstream input_file_stream(input_dir + 
utils::file::FileUtils::get_separator() + "input_data.log");
-    input_file_stream << "input_data";
-    input_file_stream.close();
-    get_file = plan->addProcessor("GetFile", "GetFile");
-    plan->setProperty(get_file, processors::GetFile::Directory.getName(), 
input_dir);
-    plan->setProperty(get_file, processors::GetFile::KeepSourceFile.getName(), 
"false");
-    update_attribute = plan->addProcessor(
-      "UpdateAttribute",
-      "UpdateAttribute",
-      core::Relationship("success", "d"),
-      true);
-    plan->addProcessor(
-      put_s3_object,
-      "PutS3Object",
-      core::Relationship("success", "d"),
-      true);
-    plan->addProcessor(
-      "LogAttribute",
-      "LogAttribute",
-      core::Relationship("success", "d"),
-      true);
-  }
-
-  void setBasicCredentials() {
-    plan->setProperty(put_s3_object, "Access Key", "key");
-    plan->setProperty(put_s3_object, "Secret Key", "secret");
-  }
-
-  void setBucket() {
-    plan->setProperty(update_attribute, "test.bucket", "testBucket", true);
-    plan->setProperty(put_s3_object, "Bucket", "${test.bucket}");
-  }
-
-  void setRequiredProperties() {
-    setBasicCredentials();
-    setBucket();
-  }
-
   void checkPutObjectResults() {
     REQUIRE(LogTestController::getInstance().contains("key:s3.version value:" 
+ S3_VERSION));
     REQUIRE(LogTestController::getInstance().contains("key:s3.etag value:" + 
S3_ETAG_UNQUOTED));
@@ -165,94 +34,36 @@ class PutS3ObjectTestsFixture {
     REQUIRE(!LogTestController::getInstance().contains("key:s3.expiration 
value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
     REQUIRE(!LogTestController::getInstance().contains("key:s3.sseAlgorithm 
value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
   }
-
-  std::string createTempFile(const std::string& filename) {
-    char temp_dir[] = "/tmp/gt.XXXXXX";
-    auto temp_path = test_controller.createTempDirectory(temp_dir);
-    REQUIRE(!temp_path.empty());
-    std::string temp_file(temp_path + utils::file::FileUtils::get_separator() 
+ filename);
-    return temp_file;
-  }
-
-  virtual ~PutS3ObjectTestsFixture() {
-    LogTestController::getInstance().reset();
-  }
-
- protected:
-  TestController test_controller;
-  std::shared_ptr<TestPlan> plan;
-  MockS3Wrapper* mock_s3_wrapper_ptr;
-  std::shared_ptr<core::Processor> put_s3_object;
-  std::shared_ptr<core::Processor> get_file;
-  std::shared_ptr<core::Processor> update_attribute;
 };
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting", 
"[awsCredentials]") {
   setBucket();
 
   SECTION("Test property credentials") {
-    plan->setProperty(update_attribute, "s3.accessKey", "key", true);
-    plan->setProperty(put_s3_object, "Access Key", "${s3.accessKey}");
-    plan->setProperty(update_attribute, "s3.secretKey", "secret", true);
-    plan->setProperty(put_s3_object, "Secret Key", "${s3.secretKey}");
+    setAccesKeyCredentialsInProcessor();
   }
 
-
   SECTION("Test credentials setting from AWS Credentials service") {
-    auto aws_cred_service = plan->addController("AWSCredentialsService", 
"AWSCredentialsService");
-    plan->setProperty(aws_cred_service, "Access Key", "key");
-    plan->setProperty(aws_cred_service, "Secret Key", "secret");
-    plan->setProperty(put_s3_object, "AWS Credentials Provider service", 
"AWSCredentialsService");
+    setAccessKeyCredentialsInController();
+    setCredentialsService();
   }
 
   SECTION("Test credentials file setting") {
-    char in_dir[] = "/tmp/gt.XXXXXX";
-    auto temp_path = test_controller.createTempDirectory(in_dir);
-    REQUIRE(!temp_path.empty());
-    std::string aws_credentials_file(temp_path + 
utils::file::FileUtils::get_separator() + "aws_creds.conf");
-    std::ofstream aws_credentials_file_stream(aws_credentials_file);
-    aws_credentials_file_stream << "accessKey=key" << std::endl;
-    aws_credentials_file_stream << "secretKey=secret" << std::endl;
-    aws_credentials_file_stream.close();
-    plan->setProperty(put_s3_object, "Credentials File", aws_credentials_file);
+    setCredentialFile(s3_processor);
   }
 
   SECTION("Test credentials file setting from AWS Credentials service") {
-    char in_dir[] = "/tmp/gt.XXXXXX";
-    auto temp_path = test_controller.createTempDirectory(in_dir);
-    REQUIRE(!temp_path.empty());
-    std::string aws_credentials_file(temp_path + 
utils::file::FileUtils::get_separator() + "aws_creds.conf");
-    std::ofstream aws_credentials_file_stream(aws_credentials_file);
-    aws_credentials_file_stream << "accessKey=key" << std::endl;
-    aws_credentials_file_stream << "secretKey=secret" << std::endl;
-    aws_credentials_file_stream.close();
-    auto aws_cred_service = plan->addController("AWSCredentialsService", 
"AWSCredentialsService");
-    plan->setProperty(aws_cred_service, "Credentials File", 
aws_credentials_file);
-    plan->setProperty(put_s3_object, "AWS Credentials Provider service", 
"AWSCredentialsService");
+    setCredentialFile(aws_credentials_service);
+    setCredentialsService();
   }
 
   SECTION("Test credentials setting using default credential chain") {
-    #ifdef WIN32
-    _putenv_s("AWS_ACCESS_KEY_ID", "key");
-    _putenv_s("AWS_SECRET_ACCESS_KEY", "secret");
-    #else
-    setenv("AWS_ACCESS_KEY_ID", "key", 1);
-    setenv("AWS_SECRET_ACCESS_KEY", "secret", 1);
-    #endif
-    plan->setProperty(put_s3_object, "Use Default Credentials", "true");
+    setUseDefaultCredentialsChain(s3_processor);
   }
 
   SECTION("Test credentials setting from AWS Credentials service using default 
credential chain") {
-    auto aws_cred_service = plan->addController("AWSCredentialsService", 
"AWSCredentialsService");
-    plan->setProperty(aws_cred_service, "Use Default Credentials", "true");
-    #ifdef WIN32
-    _putenv_s("AWS_ACCESS_KEY_ID", "key");
-    _putenv_s("AWS_SECRET_ACCESS_KEY", "secret");
-    #else
-    setenv("AWS_ACCESS_KEY_ID", "key", 1);
-    setenv("AWS_SECRET_ACCESS_KEY", "secret", 1);
-    #endif
-    plan->setProperty(put_s3_object, "AWS Credentials Provider service", 
"AWSCredentialsService");
+    setUseDefaultCredentialsChain(aws_credentials_service);
+    setCredentialsService();
   }
 
   test_controller.runSession(plan, true);
@@ -265,7 +76,7 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test required 
property not set", "[aw
   }
 
   SECTION("Test no bucket is set") {
-    setBasicCredentials();
+    setAccesKeyCredentialsInProcessor();
   }
 
   SECTION("Test no object key is set") {
@@ -275,17 +86,17 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test required 
property not set", "[aw
 
   SECTION("Test storage class is empty") {
     setRequiredProperties();
-    plan->setProperty(put_s3_object, "Storage Class", "");
+    plan->setProperty(s3_processor, "Storage Class", "");
   }
 
   SECTION("Test region is empty") {
     setRequiredProperties();
-    plan->setProperty(put_s3_object, "Region", "");
+    plan->setProperty(s3_processor, "Region", "");
   }
 
   SECTION("Test no server side encryption is set") {
     setRequiredProperties();
-    plan->setProperty(put_s3_object, "Server Side Encryption", "");
+    plan->setProperty(s3_processor, "Server Side Encryption", "");
   }
 
   REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
@@ -295,25 +106,25 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default 
client configuration",
   setRequiredProperties();
   test_controller.runSession(plan, true);
   REQUIRE(LogTestController::getInstance().contains("key:s3.bucket 
value:testBucket"));
-  REQUIRE(LogTestController::getInstance().contains("key:s3.key 
value:input_data.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:s3.key value:" + 
INPUT_FILENAME));
   REQUIRE(LogTestController::getInstance().contains("key:s3.contenttype 
value:application/octet-stream"));
   checkPutObjectResults();
-  REQUIRE(mock_s3_wrapper_ptr->content_type == "application/octet-stream");
-  REQUIRE(mock_s3_wrapper_ptr->storage_class == 
Aws::S3::Model::StorageClass::STANDARD);
-  REQUIRE(mock_s3_wrapper_ptr->server_side_encryption == 
Aws::S3::Model::ServerSideEncryption::NOT_SET);
-  REQUIRE(mock_s3_wrapper_ptr->canned_acl == 
Aws::S3::Model::ObjectCannedACL::NOT_SET);
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetContentType() == 
"application/octet-stream");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetStorageClass() == 
Aws::S3::Model::StorageClass::STANDARD);
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetServerSideEncryption() == 
Aws::S3::Model::ServerSideEncryption::NOT_SET);
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetACL() == 
Aws::S3::Model::ObjectCannedACL::NOT_SET);
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == 
minifi::aws::processors::region::US_WEST_2);
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 30000);
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride.empty());
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost.empty());
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName.empty());
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword.empty());
-  REQUIRE(mock_s3_wrapper_ptr->put_s3_data == "input_data");
+  REQUIRE(mock_s3_wrapper_ptr->getPutObjectRequestBody() == INPUT_DATA);
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default client configuration 
with empty result", "[awsS3ClientConfig]") {
   setRequiredProperties();
-  mock_s3_wrapper_ptr->get_empty_result = true;
+  mock_s3_wrapper_ptr->returnEmptyS3Result();
   test_controller.runSession(plan, true);
   REQUIRE(LogTestController::getInstance().contains("key:s3.bucket 
value:testBucket"));
   REQUIRE(LogTestController::getInstance().contains("key:s3.key 
value:input_data.log"));
@@ -323,80 +134,70 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default 
client configuration wi
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Set non-default client 
configuration", "[awsS3ClientConfig]") {
   setRequiredProperties();
-  plan->setProperty(put_s3_object, "Object Key", "custom_key");
+  plan->setProperty(s3_processor, "Object Key", "custom_key");
   plan->setProperty(update_attribute, "test.contentType", "application/tar", 
true);
-  plan->setProperty(put_s3_object, "Content Type", "${test.contentType}");
-  plan->setProperty(put_s3_object, "Storage Class", "ReducedRedundancy");
-  plan->setProperty(put_s3_object, "Region", 
minifi::aws::processors::region::US_EAST_1);
-  plan->setProperty(put_s3_object, "Communications Timeout", "10 Sec");
+  plan->setProperty(s3_processor, "Content Type", "${test.contentType}");
+  plan->setProperty(s3_processor, "Storage Class", "ReducedRedundancy");
+  plan->setProperty(s3_processor, "Region", 
minifi::aws::processors::region::US_EAST_1);
+  plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
   plan->setProperty(update_attribute, "test.endpoint", 
"http://localhost:1234";, true);
-  plan->setProperty(put_s3_object, "Endpoint Override URL", 
"${test.endpoint}");
-  plan->setProperty(put_s3_object, "Server Side Encryption", "AES256");
+  plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
+  plan->setProperty(s3_processor, "Server Side Encryption", "AES256");
   test_controller.runSession(plan, true);
   checkPutObjectResults();
   REQUIRE(LogTestController::getInstance().contains("key:s3.bucket 
value:testBucket"));
   REQUIRE(LogTestController::getInstance().contains("key:s3.key 
value:custom_key"));
   REQUIRE(LogTestController::getInstance().contains("key:s3.contenttype 
value:application/tar"));
-  REQUIRE(mock_s3_wrapper_ptr->content_type == "application/tar");
-  REQUIRE(mock_s3_wrapper_ptr->storage_class == 
Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
-  REQUIRE(mock_s3_wrapper_ptr->server_side_encryption == 
Aws::S3::Model::ServerSideEncryption::AES256);
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetContentType() == 
"application/tar");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetStorageClass() == 
Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetServerSideEncryption() == 
Aws::S3::Model::ServerSideEncryption::AES256);
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == 
minifi::aws::processors::region::US_EAST_1);
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
   REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride == 
"http://localhost:1234";);
-  REQUIRE(mock_s3_wrapper_ptr->put_s3_data == "input_data");
+  REQUIRE(mock_s3_wrapper_ptr->getPutObjectRequestBody() == INPUT_DATA);
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test single user metadata", 
"[awsS3MetaData]") {
   setRequiredProperties();
-  plan->setProperty(put_s3_object, "meta_key", "meta_value", true);
+  plan->setProperty(s3_processor, "meta_key", "meta_value", true);
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->metadata_map.at("meta_key") == "meta_value");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key") 
== "meta_value");
   REQUIRE(LogTestController::getInstance().contains("key:s3.usermetadata 
value:meta_key=meta_value"));
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test multiple user metadata", 
"[awsS3MetaData]") {
   setRequiredProperties();
-  plan->setProperty(put_s3_object, "meta_key1", "meta_value1", true);
-  plan->setProperty(put_s3_object, "meta_key2", "meta_value2", true);
+  plan->setProperty(s3_processor, "meta_key1", "meta_value1", true);
+  plan->setProperty(s3_processor, "meta_key2", "meta_value2", true);
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->metadata_map.at("meta_key1") == "meta_value1");
-  REQUIRE(mock_s3_wrapper_ptr->metadata_map.at("meta_key2") == "meta_value2");
+  
REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key1") 
== "meta_value1");
+  
REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key2") 
== "meta_value2");
   REQUIRE(LogTestController::getInstance().contains("key:s3.usermetadata 
value:meta_key1=meta_value1,meta_key2=meta_value2"));
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test proxy setting", 
"[awsS3Proxy]") {
   setRequiredProperties();
-  plan->setProperty(update_attribute, "test.proxyHost", "host", true);
-  plan->setProperty(put_s3_object, "Proxy Host", "${test.proxyHost}");
-  plan->setProperty(update_attribute, "test.proxyPort", "1234", true);
-  plan->setProperty(put_s3_object, "Proxy Port", "${test.proxyPort}");
-  plan->setProperty(update_attribute, "test.proxyUsername", "username", true);
-  plan->setProperty(put_s3_object, "Proxy Username", "${test.proxyUsername}");
-  plan->setProperty(update_attribute, "test.proxyPassword", "password", true);
-  plan->setProperty(put_s3_object, "Proxy Password", "${test.proxyPassword}");
+  setProxy();
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost == "host");
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPort == 1234);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName == "username");
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword == "password");
+  checkProxySettings();
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test access control setting", 
"[awsS3ACL]") {
   setRequiredProperties();
   plan->setProperty(update_attribute, "s3.permissions.full.users", 
"myuserid123, [email protected]", true);
-  plan->setProperty(put_s3_object, "FullControl User List", 
"${s3.permissions.full.users}");
+  plan->setProperty(s3_processor, "FullControl User List", 
"${s3.permissions.full.users}");
   plan->setProperty(update_attribute, "s3.permissions.read.users", 
"myuserid456,[email protected]", true);
-  plan->setProperty(put_s3_object, "Read Permission User List", 
"${s3.permissions.read.users}");
+  plan->setProperty(s3_processor, "Read Permission User List", 
"${s3.permissions.read.users}");
   plan->setProperty(update_attribute, "s3.permissions.readacl.users", 
"myuserid789, otheruser", true);
-  plan->setProperty(put_s3_object, "Read ACL User List", 
"${s3.permissions.readacl.users}");
+  plan->setProperty(s3_processor, "Read ACL User List", 
"${s3.permissions.readacl.users}");
   plan->setProperty(update_attribute, "s3.permissions.writeacl.users", 
"[email protected]", true);
-  plan->setProperty(put_s3_object, "Write ACL User List", 
"${s3.permissions.writeacl.users}");
+  plan->setProperty(s3_processor, "Write ACL User List", 
"${s3.permissions.writeacl.users}");
   plan->setProperty(update_attribute, "s3.permissions.cannedacl", 
"PublicReadWrite", true);
-  plan->setProperty(put_s3_object, "Canned ACL", 
"${s3.permissions.cannedacl}");
+  plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}");
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->fullcontrol_user_list == "id=myuserid123, 
emailAddress=\"[email protected]\"");
-  REQUIRE(mock_s3_wrapper_ptr->read_user_list == "id=myuserid456, 
emailAddress=\"[email protected]\"");
-  REQUIRE(mock_s3_wrapper_ptr->read_acl_user_list == "id=myuserid789, 
id=otheruser");
-  REQUIRE(mock_s3_wrapper_ptr->write_acl_user_list == 
"emailAddress=\"[email protected]\"");
-  REQUIRE(mock_s3_wrapper_ptr->canned_acl == 
Aws::S3::Model::ObjectCannedACL::public_read_write);
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantFullControl() == 
"id=myuserid123, emailAddress=\"[email protected]\"");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantRead() == 
"id=myuserid456, emailAddress=\"[email protected]\"");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantReadACP() == 
"id=myuserid789, id=otheruser");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantWriteACP() == 
"emailAddress=\"[email protected]\"");
+  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetACL() == 
Aws::S3::Model::ObjectCannedACL::public_read_write);
 }
diff --git a/libminifi/test/aws-tests/S3TestsFixture.h 
b/libminifi/test/aws-tests/S3TestsFixture.h
new file mode 100644
index 0000000..7e2e800
--- /dev/null
+++ b/libminifi/test/aws-tests/S3TestsFixture.h
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdlib.h>
+#include <iostream>
+
+#include "core/Processor.h"
+#include "../TestBase.h"
+#include "processors/GetFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "utils/file/FileUtils.h"
+#include "MockS3Wrapper.h"
+
+template<typename T>
+class S3TestsFixture {
+ public:
+  const std::string INPUT_FILENAME = "input_data.log";
+  const std::string INPUT_DATA = "input_data";
+
+  S3TestsFixture() {
+    // Disable retrieving AWS metadata for tests
+    #ifdef WIN32
+    _putenv_s("AWS_EC2_METADATA_DISABLED", "true");
+    #else
+    setenv("AWS_EC2_METADATA_DISABLED", "true", 1);
+    #endif
+
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<processors::GetFile>();
+    LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<T>();
+
+    // Build MiNiFi processing graph
+    plan = test_controller.createPlan();
+    mock_s3_wrapper_ptr = new MockS3Wrapper();
+    std::unique_ptr<minifi::aws::s3::S3WrapperBase> 
mock_s3_wrapper(mock_s3_wrapper_ptr);
+    s3_processor = std::shared_ptr<T>(new T("S3Processor", 
utils::Identifier(), std::move(mock_s3_wrapper)));
+
+    char input_dir_mask[] = "/tmp/gt.XXXXXX";
+    auto input_dir = test_controller.createTempDirectory(input_dir_mask);
+    std::ofstream input_file_stream(input_dir + 
utils::file::FileUtils::get_separator() + INPUT_FILENAME);
+    input_file_stream << INPUT_DATA;
+    input_file_stream.close();
+    get_file = plan->addProcessor("GetFile", "GetFile");
+    plan->setProperty(get_file, processors::GetFile::Directory.getName(), 
input_dir);
+    plan->setProperty(get_file, processors::GetFile::KeepSourceFile.getName(), 
"false");
+    update_attribute = plan->addProcessor(
+      "UpdateAttribute",
+      "UpdateAttribute",
+      core::Relationship("success", "d"),
+      true);
+    plan->addProcessor(
+      s3_processor,
+      "S3Processor",
+      core::Relationship("success", "d"),
+      true);
+    plan->addProcessor(
+      "LogAttribute",
+      "LogAttribute",
+      core::Relationship("success", "d"),
+      true);
+    aws_credentials_service = plan->addController("AWSCredentialsService", 
"AWSCredentialsService");
+  }
+
+  void setAccesKeyCredentialsInProcessor() {
+    plan->setProperty(update_attribute, "s3.accessKey", "key", true);
+    plan->setProperty(s3_processor, "Access Key", "${s3.accessKey}");
+    plan->setProperty(update_attribute, "s3.secretKey", "secret", true);
+    plan->setProperty(s3_processor, "Secret Key", "${s3.secretKey}");
+  }
+
+  void setAccessKeyCredentialsInController() {
+    plan->setProperty(aws_credentials_service, "Access Key", "key");
+    plan->setProperty(aws_credentials_service, "Secret Key", "secret");
+  }
+
+  template<typename Component>
+  void setCredentialFile(const Component &component) {
+    char in_dir[] = "/tmp/gt.XXXXXX";
+    auto temp_path = test_controller.createTempDirectory(in_dir);
+    REQUIRE(!temp_path.empty());
+    std::string aws_credentials_file(temp_path + 
utils::file::FileUtils::get_separator() + "aws_creds.conf");
+    std::ofstream aws_credentials_file_stream(aws_credentials_file);
+    aws_credentials_file_stream << "accessKey=key" << std::endl;
+    aws_credentials_file_stream << "secretKey=secret" << std::endl;
+    aws_credentials_file_stream.close();
+    plan->setProperty(component, "Credentials File", aws_credentials_file);
+  }
+
+  template<typename Component>
+  void setUseDefaultCredentialsChain(const Component &component) {
+    #ifdef WIN32
+    _putenv_s("AWS_ACCESS_KEY_ID", "key");
+    _putenv_s("AWS_SECRET_ACCESS_KEY", "secret");
+    #else
+    setenv("AWS_ACCESS_KEY_ID", "key", 1);
+    setenv("AWS_SECRET_ACCESS_KEY", "secret", 1);
+    #endif
+    plan->setProperty(component, "Use Default Credentials", "true");
+  }
+
+  void setCredentialsService() {
+    plan->setProperty(s3_processor, "AWS Credentials Provider service", 
"AWSCredentialsService");
+  }
+
+  void setBucket() {
+    plan->setProperty(update_attribute, "test.bucket", "testBucket", true);
+    plan->setProperty(s3_processor, "Bucket", "${test.bucket}");
+  }
+
+  void setRequiredProperties() {
+    setAccesKeyCredentialsInProcessor();
+    setBucket();
+  }
+
+  void setProxy() {
+    plan->setProperty(update_attribute, "test.proxyHost", "host", true);
+    plan->setProperty(s3_processor, "Proxy Host", "${test.proxyHost}");
+    plan->setProperty(update_attribute, "test.proxyPort", "1234", true);
+    plan->setProperty(s3_processor, "Proxy Port", "${test.proxyPort}");
+    plan->setProperty(update_attribute, "test.proxyUsername", "username", 
true);
+    plan->setProperty(s3_processor, "Proxy Username", "${test.proxyUsername}");
+    plan->setProperty(update_attribute, "test.proxyPassword", "password", 
true);
+    plan->setProperty(s3_processor, "Proxy Password", "${test.proxyPassword}");
+  }
+
+  void checkProxySettings() {
+    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost == "host");
+    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPort == 1234);
+    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName == 
"username");
+    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword == 
"password");
+  }
+
+  virtual ~S3TestsFixture() {
+    LogTestController::getInstance().reset();
+  }
+
+ protected:
+  TestController test_controller;
+  std::shared_ptr<TestPlan> plan;
+  MockS3Wrapper* mock_s3_wrapper_ptr;
+  std::shared_ptr<core::Processor> s3_processor;
+  std::shared_ptr<core::Processor> get_file;
+  std::shared_ptr<core::Processor> update_attribute;
+  std::shared_ptr<core::controller::ControllerServiceNode> 
aws_credentials_service;
+};

Reply via email to