This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ae74606  MINIFICPP-1445 - Move docker integration tests to python 
behave
ae74606 is described below

commit ae746065319c89b6df23ef6a1bd6902306087cb8
Author: Adam Hunyadi <[email protected]>
AuthorDate: Tue Jan 12 11:56:03 2021 +0100

    MINIFICPP-1445 - Move docker integration tests to python behave
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #995
---
 docker/DockerVerify.sh                             |  25 +-
 docker/Dockerfile                                  |   1 +
 .../integration/MiNiFi_integration_test_driver.py  | 210 +++++++++++++++++
 docker/test/integration/README.md                  | 193 +---------------
 docker/test/integration/__init__.py                |   0
 docker/test/integration/environment.py             |  27 +++
 .../features/file_system_operations.feature        |  40 ++++
 docker/test/integration/features/http.feature      |  60 +++++
 docker/test/integration/features/https.feature     |  23 ++
 docker/test/integration/features/kafka.feature     |  59 +++++
 docker/test/integration/features/s2s.feature       |  45 ++++
 docker/test/integration/features/s3.feature        | 155 +++++++++++++
 docker/test/integration/minifi/__init__.py         |  69 ------
 docker/test/integration/minifi/core/Cluster.py     |   2 +-
 docker/test/integration/minifi/core/Connectable.py |  45 +---
 .../integration/minifi/core/ControllerService.py   |   1 +
 .../integration/minifi/core/DockerTestCluster.py   | 192 ++++------------
 .../minifi/core/DockerTestDirectoryBindings.py     | 104 +++++++++
 .../integration/minifi/core/FileSystemObserver.py  |  46 ++++
 .../integration/minifi/core/OutputEventHandler.py  |  15 +-
 docker/test/integration/minifi/core/Processor.py   |  11 +
 .../integration/minifi/core/RemoteProcessGroup.py  |  10 +-
 .../test/integration/minifi/core/SSL_cert_utils.py |  54 +++++
 .../minifi/core/SingleNodeDockerCluster.py         | 153 ++++++------
 .../minifi/processors/GenerateFlowFile.py          |   3 +-
 .../test/integration/minifi/processors/GetFile.py  |   2 +-
 .../integration/minifi/processors/HashContent.py   |   8 +
 .../integration/minifi/processors/InvokeHTTP.py    |  18 +-
 .../integration/minifi/processors/ListenHTTP.py    |   4 +-
 .../minifi/processors/PublishKafkaSSL.py           |  16 --
 .../test/integration/minifi/processors/PutFile.py  |   2 +-
 .../integration/minifi/processors/PutS3Object.py   |   2 +-
 docker/test/integration/steps/steps.py             | 256 +++++++++++++++++++++
 docker/test/integration/test_filesystem_ops.py     |  51 ----
 docker/test/integration/test_filter_zero_file.py   |  36 ---
 docker/test/integration/test_hash_content.py       |  32 ---
 docker/test/integration/test_http.py               |  57 -----
 docker/test/integration/test_rdkafka.py            |  98 --------
 docker/test/integration/test_s2s.py                |  38 ---
 docker/test/integration/test_s3.py                 | 144 ------------
 docker/test/integration/test_zero_file.py          |  36 ---
 docker/test/test_https.py                          | 100 --------
 42 files changed, 1287 insertions(+), 1156 deletions(-)

diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index f4efbc9..dc897c2 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -58,7 +58,8 @@ if ! command swig -version &> /dev/null; then
 fi
 
 pip install --upgrade \
-            pytest \
+            behave \
+            pytimeparse \
             docker \
             PyYAML \
             m2crypto \
@@ -71,4 +72,24 @@ export PATH
 PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
 export PYTHONPATH
 
-exec pytest -s -v "${docker_dir}"/test/integration
+BEHAVE_OPTS="-f pretty --logging-level INFO --no-capture"
+
+cd "${docker_dir}/test/integration"
+exec 
+  behave $BEHAVE_OPTS "features/file_system_operations.feature" -n "Get and 
put operations run in a simple flow" &&
+  behave $BEHAVE_OPTS "features/file_system_operations.feature" -n "PutFile 
does not overwrite a file that already exists" &&
+  behave $BEHAVE_OPTS "features/s2s.feature" -n "A MiNiFi instance produces 
and transfers data to a NiFi instance via s2s" &&
+  behave $BEHAVE_OPTS "features/s2s.feature" -n "Zero length files are 
transfered between via s2s if the \"drop empty\" connection property is false" 
&&
+  behave $BEHAVE_OPTS "features/s2s.feature" -n "Zero length files are not 
transfered between via s2s if the \"drop empty\" connection property is true" &&
+  behave $BEHAVE_OPTS "features/http.feature" -n "A MiNiFi instance transfers 
data to another MiNiFi instance" &&
+  behave $BEHAVE_OPTS "features/http.feature" -n "A MiNiFi instance sends data 
through a HTTP proxy and another one listens" &&
+  behave $BEHAVE_OPTS "features/http.feature" -n "A MiNiFi instance and 
transfers hashed data to another MiNiFi instance" &&
+  behave $BEHAVE_OPTS "features/kafka.feature" -n "A MiNiFi instance transfers 
data to a kafka broker" &&
+  behave $BEHAVE_OPTS "features/kafka.feature" -n "PublishKafka sends 
flowfiles to failure when the broker is not available" &&
+  behave $BEHAVE_OPTS "features/kafka.feature" -n "PublishKafka sends can use 
SSL connect" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance transfers 
encoded data to s3" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance transfers 
encoded data through a http proxy to s3" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can remove 
s3 bucket objects" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "Deletion of a s3 object 
through a proxy-server succeeds" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download 
s3 bucket objects directly" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download 
s3 bucket objects via a http-proxy"
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 14ea617..ea6049f 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -36,6 +36,7 @@ RUN apk --update --no-cache upgrade && apk --update 
--no-cache add gcc \
   openjdk8-jre-base \
   openjdk8 \
   autoconf \
+  automake \
   libtool \
   wget \
   gdb \
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py 
b/docker/test/integration/MiNiFi_integration_test_driver.py
new file mode 100644
index 0000000..a1c63a0
--- /dev/null
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -0,0 +1,210 @@
+from subprocess import Popen, PIPE, STDOUT
+
+import docker
+import logging
+import os
+import shutil
+import threading
+import time
+import uuid
+
+from pydoc import locate
+
+from minifi.core.InputPort import InputPort
+
+from minifi.core.DockerTestCluster import DockerTestCluster
+from minifi.core.SingleNodeDockerCluster import SingleNodeDockerCluster
+from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
+
+from minifi.validators.EmptyFilesOutPutValidator import 
EmptyFilesOutPutValidator
+from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
+from minifi.validators.SingleFileOutputValidator import 
SingleFileOutputValidator
+
+class MiNiFi_integration_test():
+    def __init__(self, context):
+        logging.info("MiNiFi_integration_test init")
+        self.test_id = str(uuid.uuid4())
+        self.clusters = {}
+
+        self.connectable_nodes = []
+        # Remote process groups are not connectables
+        self.remote_process_groups = []
+        self.file_system_observer = None
+
+        self.docker_network = None
+
+        self.docker_directory_bindings = DockerTestDirectoryBindings()
+        
self.docker_directory_bindings.create_new_data_directories(self.test_id)
+
+    def __del__(self):
+        logging.info("MiNiFi_integration_test cleanup")
+
+        # Clean up network, for some reason only this order of events work for 
cleanup
+        if self.docker_network is not None:
+            logging.info('Cleaning up network network: %s', 
self.docker_network.name)
+            while len(self.docker_network.containers) != 0:
+                for container in self.docker_network.containers:
+                    self.docker_network.disconnect(container, force=True)
+                self.docker_network.reload()
+            self.docker_network.remove()
+
+        container_ids = []
+        for cluster in self.clusters.values():
+            for container in cluster.containers.values():
+                container_ids.append(container.id)
+            del cluster
+
+        # The cluster deleter is not reliable for cleaning up
+        docker_client = docker.from_env()
+        for container_id in container_ids:    
+            self.delete_docker_container_by_id(container_id)
+
+        del self.docker_directory_bindings
+
+    def delete_docker_container_by_id(self, container_id):
+        docker_client = docker.from_env()
+        try:
+            container = docker_client.containers.get(container_id)
+            container.remove(v=True, force=True)
+        except docker.errors.NotFound:
+            logging.warn("Contaner '%s' is already cleaned up before.", 
container_id)
+            return
+        wait_start_time = time.perf_counter()
+        while (time.perf_counter() - wait_start_time) < 35:
+            try:
+                docker_client.containers.get(container_id)
+                logging.error("Docker container '%s' still exists after 
removal attempt. Waiting for docker daemon to update...", container_id)
+                time.sleep(5)
+            except docker.errors.NotFound:
+                logging.info("Docker container cleanup successful for '%s'.", 
container_id)
+                return
+        logging.error("Failed to clean up docker container '%s'.", 
container_id)
+
+    def docker_path_to_local_path(self, docker_path):
+        return 
self.docker_directory_bindings.docker_path_to_local_path(self.test_id, 
docker_path)
+
+    def get_test_id(self):
+        return self.test_id
+
+    def acquire_cluster(self, name):
+        return self.clusters.setdefault(name, DockerTestCluster())
+
+    def set_up_cluster_network(self):
+        self.docker_network = SingleNodeDockerCluster.create_docker_network()
+        for cluster in self.clusters.values():
+            cluster.set_network(self.docker_network)
+
+    def start(self):
+        logging.info("MiNiFi_integration_test start")
+        self.set_up_cluster_network()
+        for cluster in self.clusters.values():
+            logging.info("Starting cluster %s with an engine of %s", 
cluster.get_name(), cluster.get_engine())
+            
cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
+            cluster.deploy_flow()
+        for cluster_name, cluster in self.clusters.items():
+            startup_success = True
+            logging.info("Engine: %s", cluster.get_engine())
+            if cluster.get_engine() == "minifi-cpp":
+                startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller", 120)
+            elif cluster.get_engine() == "nifi":
+                startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller...", 120)
+            elif cluster.get_engine() == "kafka-broker":
+                startup_success = cluster.wait_for_app_logs("Startup 
complete.", 120)
+            if not startup_success:
+                cluster.log_nifi_output()
+            assert startup_success
+
+    def add_node(self, processor):
+        if processor.get_name() in (elem.get_name() for elem in 
self.connectable_nodes):
+            raise Exception("Trying to register processor with an already 
registered name: \"%s\"" % processor.get_name())
+        self.connectable_nodes.append(processor)
+
+    def get_or_create_node_by_name(self, node_name):
+        node = self.get_node_by_name(node_name) 
+        if node is None:
+            if node_name == "RemoteProcessGroup":
+                raise Exception("Trying to register RemoteProcessGroup without 
an input port or address.")
+            node = locate("minifi.processors." + node_name + "." + node_name)()
+            node.set_name(node_name)
+            self.add_node(node)
+        return node
+
+    def get_node_by_name(self, name):
+        for node in self.connectable_nodes:
+            if name == node.get_name():
+                return node
+        raise Exception("Trying to fetch unknow node: \"%s\"" % name)
+
+    def add_remote_process_group(self, remote_process_group):
+        if remote_process_group.get_name() in (elem.get_name() for elem in 
self.remote_process_groups):
+            raise Exception("Trying to register remote_process_group with an 
already registered name: \"%s\"" % remote_process_group.get_name())
+        self.remote_process_groups.append(remote_process_group)
+
+    def get_remote_process_group_by_name(self, name):
+        for node in self.remote_process_groups:
+            if name == node.get_name():
+                return node
+        raise Exception("Trying to fetch unknow node: \"%s\"" % name)
+
+    @staticmethod
+    def generate_input_port_for_remote_process_group(remote_process_group, 
name):
+        input_port_node = InputPort(name, remote_process_group)
+        # Generate an MD5 hash unique to the remote process group id
+        input_port_node.set_uuid(uuid.uuid3(remote_process_group.get_uuid(), 
"input_port"))
+        return input_port_node
+
+    def add_test_data(self, path, test_data):
+        file_name = str(uuid.uuid4())
+        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, 
path, file_name, test_data.encode('utf-8'))
+
+    def put_test_resource(self, file_name, contents):
+        self.docker_directory_bindings.put_test_resource(self.test_id, 
file_name, contents)
+
+    def get_out_subdir(self, subdir):
+        return self.docker_directory_bindings.get_out_subdir(self.test_id, 
subdir)
+
+    def rm_out_child(self, subdir):
+        self.docker_directory_bindings.rm_out_child(self.test_id, subdir)
+
+    def add_file_system_observer(self, file_system_observer):
+        self.file_system_observer = file_system_observer
+
+    def check_for_no_files_generated(self, timeout_seconds, subdir=''):
+        output_validator = NoFileOutPutValidator()
+        
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 1, subdir)
+
+    def check_for_file_with_content_generated(self, content, timeout_seconds, 
subdir=''):
+        output_validator = SingleFileOutputValidator(content)
+        
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 1, subdir)
+
+    def check_for_multiple_empty_files_generated(self, timeout_seconds, 
subdir=''):
+        output_validator = EmptyFilesOutPutValidator()
+        
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 2, subdir)
+
+    def check_output(self, timeout_seconds, output_validator, max_files, 
subdir):
+        if subdir:
+            output_validator.subdir = subdir
+        self.file_system_observer.wait_for_output(timeout_seconds, max_files)
+        for cluster in self.clusters.values():
+            # Logs for both nifi and minifi, but not other engines
+            cluster.log_nifi_output()
+            assert not cluster.segfault_happened()
+        assert output_validator.validate()
+
+    def check_s3_server_object_data(self, cluster_name, object_data):
+        cluster = self.acquire_cluster(cluster_name)
+        cluster.check_s3_server_object_data(object_data)
+
+    def check_s3_server_object_metadata(self, cluster_name, content_type):
+        cluster = self.acquire_cluster(cluster_name)
+        cluster.check_s3_server_object_metadata(content_type)
+
+    def check_empty_s3_bucket(self, cluster_name):
+        cluster = self.acquire_cluster(cluster_name)
+        assert cluster.is_s3_bucket_empty()
+
+    def check_http_proxy_access(self, cluster_name, url):
+        self.clusters[cluster_name].check_http_proxy_access(url)
diff --git a/docker/test/integration/README.md 
b/docker/test/integration/README.md
index e4fb628..421cfca 100644
--- a/docker/test/integration/README.md
+++ b/docker/test/integration/README.md
@@ -19,180 +19,19 @@ Apache MiNiFi includes a suite of docker-based system 
integration tests. These
 tests are designed to test the integration between distinct MiNiFi instances as
 well as other systems which are available in docker, such as Apache NiFi.
 
-* Currently test_https.py does not work due to the upgrade to NiFi 1.7. This 
will be resolved as
-  soon as possible.
+* Currently there is an extra unused test mockup for testing TLS with 
invoke_http.
+* HashContent tests do not actually seem what they advertise to
+* There is a test requirement for PublishKafka, confirming it can handle 
broker outages. This will be reintroduced when ConsumeKafka is on the master 
and will have its similar testing requirements implemented.
 
 ## Test environment
 
-The test framework is written in Python 3 and uses pip3 to add required 
packages.
+The test framework is written in Python 3 and uses pip3 to add required 
packages. The framework it uses is python-behave, a BDD testing framework. The 
feature specifications are written in human readable format in the features 
directory. Please refer to the behave documentation on how the framework 
performs testing.
 
 The tests use docker containers so docker engine should be installed on your 
system. Check the [get docker](https://docs.docker.com/get-docker/) page for 
further information.
 
 One of the required python packages is the `m2crypto` package which depends on 
`swig` for compilation,
 so `swig` should also be installed on your system (e.g. `sudo apt install 
swig` on debian based systems).
 
-## Test Execution Lifecycle
-
-Each test involves the following stages as part of its execution lifecycle:
-
-### Definition of flows/Flow DSL
-
-Flows are defined using a python-native domain specific language (DSL). The DSL
-supports the standard primitives which make up a NiFi/MiNiFi flow, such as
-processors, connections, and controller services. Several processors defined in
-the DSL have optional, named parameters enabling concise flow expression.
-
-By default, all relationships are set to auto-terminate. If a relationship is
-used, it is automatically taken out of the auto\_terminate list.
-
-**Example Trivial Flow:**
-
-```python
-flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
-```
-
-#### Supported Processors
-
-The following processors/parameters are supported:
-
-**GetFile**
-
-- input\_dir
-
-**PutFile**
-
-- output\_dir
-
-**LogAttribute**
-
-**ListenHTTP**
-
-- port
-- cert=None
-
-**InvokeHTTP**
-
-- url
-- method='GET'
-- ssl\_context\_service=None
-
-#### Remote Process Groups
-
-Remote process groups and input ports are supported.
-
-**Example InputPort/RemoteProcessGroup:**
-
-```python
-port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
-```
-
-InputPorts may be used as inputs or outputs in the flow DSL:
-
-```python
-recv_flow = (port
-             >> LogAttribute()
-             >> PutFile('/tmp/output'))
-
-send_flow = (GetFile('/tmp/input')
-             >> LogAttribute()
-             >> port)
-```
-
-These example flows could be deployed as separate NiFi/MiNiFi instances where
-the send\_flow would send data to the recv\_flow using the site-to-site
-protocol.
-
-### Definition of an output validator
-
-The output validator is responsible for checking the state of a cluster for
-valid output conditions. Currently, the only supported output validator is the
-SingleFileOutputValidator, which looks for a single file to be written to
-/tmp/output by a flow having a given string as its contents.
-
-**Example SingleFileOutputValidator:**
-
-```python
-SingleFileOutputValidator('example output')
-```
-
-This example SingleFileOutputValidator would validate that a single file is
-written with the contents 'example output.'
-
-### Creation of a DockerTestCluster
-
-DockerTestCluster instances are used to deploy one or more flow to a simulated
-or actual multi-host docker cluster. This enables testing of interactions
-between multiple system components, such as MiNiFi flows. Before the test
-cluster is destroyed, an assertion may be performed on the results of the
-*check\_output()* method of the cluster. This invokes the validator supplied at
-construction against the output state of the system.
-
-Creation of a DockerTestCluster is simple:
-
-**Example DockerTestCluster Instantiation:**
-
-```python
-with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-  ...
-  # Perform test operations
-  ...
-  assert cluster.check_output()
-```
-
-Note that a docker cluster must be created inside of a *with* structure to
-ensure that all resources are created and destroyed cleanly.
-
-### Insertion of test input data
-
-Although arbitrary NiFi flows can ingest data from a multitude of sources, a
-MiNiFi system integration test is expected to receive input via deterministed,
-controlled channels. The primary supported method of providing input to a
-MiNiFi system integration test is to insert data into the filesystem at
-/tmp/input.
-
-To write a string to the contents of a file in /tmp/input, use the
-*put\_test\_data()* method.
-
-**Example put\_test\_data() Usage:**
-
-```python
-cluster.put_test_data('test')
-```
-
-This writes a file with a random name to /tmp/input, with the contents 'test.'
-
-To provide a resource to a container, such as a TLS certificate, use the
-*put\_test\_resource()* method to write a resource file to /tmp/resources.
-
-**Example put\_test\_resource() Usage:**
-
-```python
-cluster.put_test_resource('test-resource', 'resource contents')
-```
-
-This writes a file to /tmp/resources/test-resource with the contents 'resource
-contents.'
-
-### Deployment of one or more flows
-
-Deployment of flows to a test cluster is performed using the *deploy\_flow()*
-method of a cluster. Each flow is deployed as a separate docker service having
-its own DNS name. If a name is not provided upon deployment, a random name will
-be used.
-
-**Example deploy\_flow() Usage:**
-
-```python
-cluster.deploy_flow(flow, name='test-flow')
-```
-
-The deploy\_flow function defaults to a MiNiFi - C++ engine, but other engines,
-such as NiFi may be used:
-
-```python
-cluster.deploy_flow(flow, engine='nifi')
-```
-
 ### Execution of one or more flows
 
 Flows are executed immediately upon deployment and according to schedule
@@ -202,27 +41,3 @@ are deployed. Filesystem events are monitored using event 
APIs, ensuring that
 flows are executed immediately upon input availability and output is validated
 immediately after it is written to disk.
 
-### Output validation
-
-As soon as data is written to /tmp/output, the OutputValidator (defined
-according to the documentation above) is executed on the output. The
-*check\_output()* cluster method waits for up to 5 seconds for valid output.
-
-### Cluster teardown/cleanup
-
-The deployment of a test cluster involves creating one or more docker
-containers and networks, as well as temporary files/directories on the host
-system. All resources are cleaned up automatically as long as clusters are
-created within a *with* block.
-
-```python
-
-# Using the with block ensures that all cluster resources are cleaned up after
-# the test cluster is no longer needed.
-
-with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-  ...
-  # Perform test operations
-  ...
-  assert cluster.check_output()
-```
diff --git a/docker/test/integration/__init__.py 
b/docker/test/integration/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/docker/test/integration/environment.py 
b/docker/test/integration/environment.py
new file mode 100644
index 0000000..68ddd42
--- /dev/null
+++ b/docker/test/integration/environment.py
@@ -0,0 +1,27 @@
+from behave import fixture, use_fixture
+import sys
+sys.path.append('../minifi')
+import logging
+
+from MiNiFi_integration_test_driver import MiNiFi_integration_test
+from minifi import *
+
+def raise_exception(exception):
+    raise exception
+
+@fixture
+def test_driver_fixture(context):
+    logging.info("Integration test setup")
+    context.test = MiNiFi_integration_test(context)
+    yield context.test
+    logging.info("Integration test teardown...")
+    del context.test
+
+def before_scenario(context, scenario):
+    use_fixture(test_driver_fixture, context)
+
+def after_scenario(context, scenario):
+       pass
+
+def before_all(context):
+    context.config.setup_logging()
diff --git a/docker/test/integration/features/file_system_operations.feature 
b/docker/test/integration/features/file_system_operations.feature
new file mode 100644
index 0000000..a11e398
--- /dev/null
+++ b/docker/test/integration/features/file_system_operations.feature
@@ -0,0 +1,40 @@
+Feature: File system operations are handled by the GetFile and PutFile 
processors
+  In order to store and access data on the local file system
+  As a user of MiNiFi
+  I need to have GetFile and PutFile processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: Get and put operations run in a simple flow
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PutFile
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 10 seconds
+
+  Scenario: PutFile does not overwrite a file that already exists
+    Given a set of processors:
+      | type    | name      | uuid                                 |
+      | GetFile | GetFile   | 66259995-11da-41df-bff7-e262d5f6d7c9 |
+      | PutFile | PutFile_1 | 694423a0-26f3-4e95-9f9f-c03b6d6c189d |
+      | PutFile | PutFile_2 | f37e51e9-ad67-4e16-9dc6-ad853b0933e3 |
+      | PutFile | PutFile_3 | f37e51e9-ad67-4e16-9dc6-ad853b0933e3 |
+
+    And these processor properties are set:
+      | processor name | property name   | property value |
+      | GetFile        | Input Directory | /tmp/input     |
+      | PutFile_1      | Input Directory | /tmp           |
+      | PutFile_2      | Directory       | /tmp           |
+      | PutFile_3      | Directory       | /tmp/output    |
+
+    And the processors are connected up as described here:
+      | source name | relationship name | destination name |
+      | GetFile     | success           | PutFile_1        |
+      | PutFile_1   | success           | PutFile_2        |
+      | PutFile_2   | failuire          | PutFile_3        |
+
+    And a file with the content "test" is present in "/tmp/input"
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 10 seconds
diff --git a/docker/test/integration/features/http.feature 
b/docker/test/integration/features/http.feature
new file mode 100644
index 0000000..0652fd9
--- /dev/null
+++ b/docker/test/integration/features/http.feature
@@ -0,0 +1,60 @@
+Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
+  In order to send and receive data via HTTP
+  As a user of MiNiFi
+  I need to have ListenHTTP and InvokeHTTP processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to another MiNiFi instance
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a InvokeHTTP processor with the "Remote URL" property set to 
"http://secondary:8080/contentListener";
+    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "success" relationship of the GetFile processor is connected to 
the InvokeHTTP
+
+    And a ListenHTTP processor with the "Listening Port" property set to 
"8080" in a "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "secondary" flow
+    And the "success" relationship of the ListenHTTP processor is connected to 
the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 30 seconds
+
+  Scenario: A MiNiFi instance sends data through a HTTP proxy and another one 
listens
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a InvokeHTTP processor with the "Remote URL" property set to 
"http://minifi-listen:8080/contentListener";
+    And these processor properties are set to match the http proxy:
+      | processor name | property name             | property value |
+      | InvokeHTTP     | HTTP Method               | POST           |
+      | InvokeHTTP     | Proxy Host                | http-proxy     |
+      | InvokeHTTP     | Proxy Port                | 3128           |
+      | InvokeHTTP     | invokehttp-proxy-username | admin          |
+      | InvokeHTTP     | invokehttp-proxy-password | test101        |
+    And the "success" relationship of the GetFile processor is connected to 
the InvokeHTTP
+
+    And a http proxy server "http-proxy" is set up accordingly 
+
+    And a ListenHTTP processor with the "Listening Port" property set to 
"8080" in a "minifi-listen" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "minifi-listen" flow
+    And the "success" relationship of the ListenHTTP processor is connected to 
the PutFile
+
+    When all instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And no errors were generated on the "http-proxy" regarding 
"http://minifi-listen:8080/contentListener";
+
+  Scenario: A MiNiFi instance and transfers hashed data to another MiNiFi 
instance
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a HashContent processor with the "Hash Attribute" property set to 
"hash"
+    And a InvokeHTTP processor with the "Remote URL" property set to 
"http://secondary:8080/contentListener";
+    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "success" relationship of the GetFile processor is connected to 
the HashContent
+    And the "success" relationship of the HashContent processor is connected 
to the InvokeHTTP
+
+    And a ListenHTTP processor with the "Listening Port" property set to 
"8080" in a "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "secondary" flow
+    And the "success" relationship of the ListenHTTP processor is connected to 
the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 30 seconds
diff --git a/docker/test/integration/features/https.feature 
b/docker/test/integration/features/https.feature
new file mode 100644
index 0000000..de13b05
--- /dev/null
+++ b/docker/test/integration/features/https.feature
@@ -0,0 +1,23 @@
+Feature: Using SSL context service to send data with TLS
+  In order to send data via HTTPS
+  As a user of MiNiFi
+  I need to have access to the SSLContextService
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance sends data using InvokeHTTP to a receiver using 
ListenHTTP with TLS
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a InvokeHTTP processor with the "Remote URL" property set to 
"https://secondary:4430/contentListener";
+    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+
+    And the "success" relationship of the GetFile processor is connected to 
the InvokeHTTP
+
+    And a ListenHTTP processor with the "Listening Port" property set to 
"4430" in a "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "secondary" flow
+    And the "success" relationship of the ListenHTTP processor is connected to 
the PutFile
+
+    And an ssl context service set up for InvokeHTTP and ListenHTTP
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
diff --git a/docker/test/integration/features/kafka.feature 
b/docker/test/integration/features/kafka.feature
new file mode 100644
index 0000000..2395115
--- /dev/null
+++ b/docker/test/integration/features/kafka.feature
@@ -0,0 +1,59 @@
+Feature: Sending data to using Kafka streaming platform using PublishKafka
+  In order to send data to a Kafka stream
+  As a user of MiNiFi
+  I need to have PublishKafka processor
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to a kafka broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the 
PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+
+  Scenario: PublishKafka sends flowfiles to failure when the broker is not 
available
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "no broker" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PutFile
+
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "no broker" is placed in the monitored 
directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SSL connect
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker 
instance
+    And these processor properties are set:
+      | processor name | property name          | property value               
              |
+      | PublishKafka   | Client Name            | LMN                          
              |
+      | PublishKafka   | Known Brokers          | kafka-broker:9093            
              |
+      | PublishKafka   | Topic Name             | test                         
              |
+      | PublishKafka   | Batch Size             | 10                           
              |
+      | PublishKafka   | Compress Codec         | none                         
              |
+      | PublishKafka   | Delivery Guarantee     | 1                            
              |
+      | PublishKafka   | Request Timeout        | 10 sec                       
              |
+      | PublishKafka   | Message Timeout Phrase | 12 sec                       
              |
+      | PublishKafka   | Security CA Key        | /tmp/resources/certs/ca-cert 
              |
+      | PublishKafka   | Security Cert          | 
/tmp/resources/certs/client_LMN_client.pem |
+      | PublishKafka   | Security Pass Phrase   | abcdefgh                     
              |
+      | PublishKafka   | Security Private Key   | 
/tmp/resources/certs/client_LMN_client.key |
+      | PublishKafka   | Security Protocol      | ssl                          
              |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
+    And the "success" relationship of the GetFile processor is connected to 
the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the 
PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
diff --git a/docker/test/integration/features/s2s.feature 
b/docker/test/integration/features/s2s.feature
new file mode 100644
index 0000000..5b8c20a
--- /dev/null
+++ b/docker/test/integration/features/s2s.feature
@@ -0,0 +1,45 @@
+Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
+  In order to transfer data inbetween NiFi and MiNiFi flows
+  As a user of MiNiFi
+  I need to have RemoteProcessGroup flow nodes
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance produces and transfers data to a NiFi instance 
via s2s
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a RemoteProcessGroup node opened on "http://nifi:8080/nifi";
+    And the "success" relationship of the GetFile processor is connected to 
the input port on the RemoteProcessGroup
+
+    And a NiFi flow "nifi" receiving data from a RemoteProcessGroup 
"from-minifi" on port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the 
PutFile
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
+
+  Scenario: Zero length files are transfered between via s2s if the "drop 
empty" connection property is false
+    Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
+    And a RemoteProcessGroup node opened on "http://nifi:8080/nifi";
+    And the "success" relationship of the GenerateFlowFile processor is 
connected to the input port on the RemoteProcessGroup
+
+    And a NiFi flow "nifi" receiving data from a RemoteProcessGroup 
"from-minifi" on port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the 
PutFile
+
+    When both instances start up
+    Then at least one empty flowfile is placed in the monitored directory in 
less than 90 seconds
+
+  Scenario: Zero length files are not transfered between via s2s if the "drop 
empty" connection property is true
+    Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
+    And a RemoteProcessGroup node opened on "http://nifi:8080/nifi";
+    And the "success" relationship of the GenerateFlowFile processor is 
connected to the input port on the RemoteProcessGroup
+    And the connection going to the RemoteProcessGroup has "drop empty" set
+
+    And a NiFi flow "nifi" receiving data from a RemoteProcessGroup 
"from-minifi" on port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the 
PutFile
+
+    When both instances start up
+    Then no files are placed in the monitored directory in 90 seconds of 
running time
diff --git a/docker/test/integration/features/s3.feature 
b/docker/test/integration/features/s3.feature
new file mode 100644
index 0000000..897bf18
--- /dev/null
+++ b/docker/test/integration/features/s3.feature
@@ -0,0 +1,155 @@
+Feature: Sending data from MiNiFi-C++ to an AWS server
+  In order to transfer data to interact with AWS servers
+  As a user of MiNiFi
+  I need to have PutS3Object and DeleteS3Object processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers encoded data to s3
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ 
\"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PutS3Object
+    And the "success" relationship of the PutS3Object processor is connected 
to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When both instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the object on the "s3" s3 server is "LH_O#L|FD<FASD{FO#@$#$%^ 
\"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{"
+    And the object content type on the "s3" s3 server is 
"application/octet-stream" and the object metadata matches use metadata
+
+  Scenario: A MiNiFi instance transfers encoded data through a http proxy to s3
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ 
\"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And these processor properties are set to match the http proxy:
+    | processor name  | property name  | property value |
+    | PutS3Object     | Proxy Host     | http-proxy     |
+    | PutS3Object     | Proxy Port     | 3128           |
+    | PutS3Object     | Proxy Username | admin          |
+    | PutS3Object     | Proxy Password | test101        |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PutS3Object
+    And the "success" relationship of the PutS3Object processor is connected 
to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And the http proxy server "http-proxy" is set up 
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
+    And the object on the "s3" s3 server is "LH_O#L|FD<FASD{FO#@$#$%^ 
\"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{"
+    And the object content type on the "s3" s3 server is 
"application/octet-stream" and the object metadata matches use metadata
+    And no errors were generated on the "http-proxy" regarding 
"http://s3-server:9090/test_bucket/test_object_key";
+
+  Scenario: A MiNiFi instance can remove s3 bucket objects
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ 
\"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And a DeleteS3Object processor set up to communicate with the same s3 
server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the processors are connected up as described here:
+      | source name    | relationship name | destination name |
+      | GetFile        | success           | PutS3Object      |
+      | PutS3Object    | success           | DeleteS3Object   |
+      | DeleteS3Object | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When both instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 120 seconds
+    And the object bucket on the "s3" s3 server is empty
+
+  Scenario: Deletion of a non-existent s3 object succeeds
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a DeleteS3Object processor set up to communicate with an s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the DeleteS3Object
+    And the "success" relationship of the DeleteS3Object processor is 
connected to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the DeleteS3Object
+
+    When both instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 120 seconds
+    And the object bucket on the "s3" s3 server is empty
+
+  Scenario: Deletion of a s3 object through a proxy-server succeeds
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ 
\"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And a DeleteS3Object processor set up to communicate with the same s3 
server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And these processor properties are set to match the http proxy:
+      | processor name  | property name  | property value |
+      | DeleteS3Object     | Proxy Host     | http-proxy     |
+      | DeleteS3Object     | Proxy Port     | 3128           |
+      | DeleteS3Object     | Proxy Username | admin          |
+      | DeleteS3Object     | Proxy Password | test101        |
+    And the processors are connected up as described here:
+      | source name    | relationship name | destination name |
+      | GetFile        | success           | PutS3Object      |
+      | PutS3Object    | success           | DeleteS3Object   |
+      | DeleteS3Object | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And the http proxy server "http-proxy" is set up 
+
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 120 seconds
+    And the object bucket on the "s3" s3 server is empty
+    And no errors were generated on the "http-proxy" regarding 
"http://s3-server:9090/test_bucket/test_object_key";
+
+  Scenario: A MiNiFi instance can download s3 bucket objects directly
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And the "success" relationship of the GetFile processor is connected to 
the PutS3Object
+
+    Given a GenerateFlowFile processor with the "File Size" property set to "1 
kB" in a "secondary" flow
+    And a FetchS3Object processor set up to communicate with the same s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the processors are connected up as described here:
+      | source name      | relationship name | destination name |
+      | GenerateFlowFile | success           | FetchS3Object    |
+      | FetchS3Object    | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 120 seconds
+
+  Scenario: A MiNiFi instance can download s3 bucket objects via a http-proxy
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And the "success" relationship of the GetFile processor is connected to 
the PutS3Object
+
+    Given a GenerateFlowFile processor with the "File Size" property set to "1 
kB" in a "secondary" flow
+    And a FetchS3Object processor set up to communicate with the same s3 server
+    And these processor properties are set to match the http proxy:
+      | processor name | property name  | property value |
+      | FetchS3Object  | Proxy Host     | http-proxy     |
+      | FetchS3Object  | Proxy Port     | 3128           |
+      | FetchS3Object  | Proxy Username | admin          |
+      | FetchS3Object  | Proxy Password | test101        |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the processors are connected up as described here:
+      | source name      | relationship name | destination name |
+      | GenerateFlowFile | success           | FetchS3Object    |
+      | FetchS3Object    | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And a http proxy server "http-proxy" is set up accordingly 
+
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 120 seconds
+    And no errors were generated on the "http-proxy" regarding 
"http://s3-server:9090/test_bucket/test_object_key";
diff --git a/docker/test/integration/minifi/__init__.py 
b/docker/test/integration/minifi/__init__.py
index 68d136a..e69de29 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import gzip
-import logging
-import tarfile
-import uuid
-import xml.etree.cElementTree as elementTree
-from xml.etree.cElementTree import Element
-from io import StringIO
-from io import BytesIO
-from textwrap import dedent
-
-import docker
-import os
-import yaml
-from copy import copy
-
-import time
-from collections import OrderedDict
-
-from .core.Connectable import Connectable
-from .core.Cluster import Cluster
-from .core.Connectable import Connectable
-from .core.ControllerService import ControllerService
-from .core.InputPort import InputPort
-from .core.Processor import Processor
-from .core.RemoteProcessGroup import RemoteProcessGroup
-from .core.SingleNodeDockerCluster import SingleNodeDockerCluster
-from .core.SSLContextService import SSLContextService
-from .core.DockerTestCluster import DockerTestCluster
-from .core.OutputEventHandler import OutputEventHandler
-
-from .flow_serialization.Minifi_flow_yaml_serializer import 
Minifi_flow_yaml_serializer
-from .flow_serialization.Nifi_flow_xml_serializer import 
Nifi_flow_xml_serializer
-
-from .processors.GenerateFlowFile import GenerateFlowFile
-from .processors.GetFile import GetFile
-from .processors.InvokeHTTP import InvokeHTTP
-from .processors.ListenHTTP import ListenHTTP
-from .processors.LogAttribute import LogAttribute
-from .processors.PublishKafka import PublishKafka
-from .processors.PublishKafkaSSL import PublishKafkaSSL
-from .processors.PutFile import PutFile
-from .processors.PutS3Object import PutS3Object
-from .processors.DeleteS3Object import DeleteS3Object
-from .processors.FetchS3Object import FetchS3Object
-
-from .validators.OutputValidator import OutputValidator
-from .validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
-from .validators.SegfaultValidator import SegfaultValidator
-from .validators.NoFileOutPutValidator import NoFileOutPutValidator
-from .validators.SingleFileOutputValidator import SingleFileOutputValidator
-from .validators.FileOutputValidator import FileOutputValidator
-
-logging.basicConfig(level=logging.DEBUG)
-
-
diff --git a/docker/test/integration/minifi/core/Cluster.py 
b/docker/test/integration/minifi/core/Cluster.py
index 0818f37..db69ec9 100644
--- a/docker/test/integration/minifi/core/Cluster.py
+++ b/docker/test/integration/minifi/core/Cluster.py
@@ -5,7 +5,7 @@ class Cluster(object):
     Docker swarms, or cloud compute/container services.
     """
 
-    def deploy_flow(self, flow, name=None, vols=None):
+    def deploy_flow(self, name=None):
         """
         Deploys a flow to the cluster.
         """
diff --git a/docker/test/integration/minifi/core/Connectable.py 
b/docker/test/integration/minifi/core/Connectable.py
index c3472a7..6af3a7d 100644
--- a/docker/test/integration/minifi/core/Connectable.py
+++ b/docker/test/integration/minifi/core/Connectable.py
@@ -9,7 +9,7 @@ class Connectable(object):
         self.uuid = uuid.uuid4()
 
         if name is None:
-            self.name = str(self.uuid)
+            self.name = "node_of_" + str(self.uuid)
         else:
             self.name = name
 
@@ -37,41 +37,14 @@ class Connectable(object):
 
         return self
 
-    def __rshift__(self, other):
-        """
-        Right shift operator to support flow DSL, for example:
+    def get_name(self):
+        return self.name
 
-            GetFile('/input') >> LogAttribute() >> PutFile('/output')
+    def set_name(self, name):
+        self.name = name
 
-        """
+    def get_uuid(self):
+        return self.uuid
 
-        connected = copy(self)
-        connected.connections = copy(self.connections)
-
-        if self.out_proc is self:
-            connected.out_proc = connected
-        else:
-            connected.out_proc = copy(connected.out_proc)
-
-        if isinstance(other, tuple):
-            if isinstance(other[0], tuple):
-                for rel_tuple in other:
-                    rel = {rel_tuple[0]: rel_tuple[1]}
-                    connected.out_proc.connect(rel)
-            else:
-                rel = {other[0]: other[1]}
-                connected.out_proc.connect(rel)
-        else:
-            connected.out_proc.connect({'success': other})
-            connected.out_proc = other
-
-        return connected
-
-    def __invert__(self):
-        """
-        Invert operation to set empty file filtering on incoming connections
-        GetFile('/input') >> ~LogAttribute()
-        """
-        self.drop_empty_flowfiles = True
-
-        return self
+    def set_uuid(self, uuid):
+        self.uuid = uuid
diff --git a/docker/test/integration/minifi/core/ControllerService.py 
b/docker/test/integration/minifi/core/ControllerService.py
index d8b4e17..6263ff5 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -1,4 +1,5 @@
 import uuid
+import logging
 
 class ControllerService(object):
     def __init__(self, name=None, properties=None):
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py 
b/docker/test/integration/minifi/core/DockerTestCluster.py
index 34b4bbc..772558a 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -7,80 +7,19 @@ import sys
 import time
 import uuid
 
-from os.path import join
-from threading import Event
-from watchdog.events import FileSystemEventHandler
-from watchdog.observers import Observer
-
-from .OutputEventHandler import OutputEventHandler
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
-from ..validators.FileOutputValidator import FileOutputValidator
 from .utils import retry_check
-
+from .FileSystemObserver import FileSystemObserver
 
 class DockerTestCluster(SingleNodeDockerCluster):
-    def __init__(self, output_validator):
-
-        # Create test input/output directories
-        test_cluster_id = str(uuid.uuid4())
-
+    def __init__(self):
         self.segfault = False
 
-        self.tmp_test_output_dir = '/tmp/.nifi-test-output.' + test_cluster_id
-        self.tmp_test_input_dir = '/tmp/.nifi-test-input.' + test_cluster_id
-        self.tmp_test_resources_dir = '/tmp/.nifi-test-resources.' + 
test_cluster_id
-
-        logging.info('Creating tmp test input dir: %s', 
self.tmp_test_input_dir)
-        os.makedirs(self.tmp_test_input_dir)
-        logging.info('Creating tmp test output dir: %s', 
self.tmp_test_output_dir)
-        os.makedirs(self.tmp_test_output_dir)
-        logging.info('Creating tmp test resource dir: %s', 
self.tmp_test_resources_dir)
-        os.makedirs(self.tmp_test_resources_dir)
-        os.chmod(self.tmp_test_output_dir, 0o777)
-        os.chmod(self.tmp_test_input_dir, 0o777)
-        os.chmod(self.tmp_test_resources_dir, 0o777)
-
-        # Add resources
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on 
DockerVerify.sh
-        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", 
self.tmp_test_resources_dir + "/certs")
-
-        # Point output validator to ephemeral output dir
-        self.output_validator = output_validator
-        if isinstance(output_validator, FileOutputValidator):
-            output_validator.set_output_dir(self.tmp_test_output_dir)
-
-        # Start observing output dir
-        self.done_event = Event()
-        self.event_handler = OutputEventHandler(self.output_validator, 
self.done_event)
-        self.observer = Observer()
-        self.observer.schedule(self.event_handler, self.tmp_test_output_dir)
-        self.observer.start()
-
         super(DockerTestCluster, self).__init__()
 
+    def deploy_flow(self):
 
-
-    def deploy_flow(self,
-                    flow,
-                    name=None,
-                    vols=None,
-                    engine='minifi-cpp'):
-        """
-        Performs a standard container flow deployment with the addition
-        of volumes supporting test input/output directories.
-        """
-
-        if vols is None:
-            vols = {}
-
-        vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
-        vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
-        vols[self.tmp_test_resources_dir] = {'bind': '/tmp/resources', 'mode': 
'rw'}
-
-        super(DockerTestCluster, self).deploy_flow(flow,
-                                                   vols=vols,
-                                                   name=name,
-                                                   engine=engine)
+        super(DockerTestCluster, self).deploy_flow()
 
     def start_flow(self, name):
         container = self.containers[name]
@@ -102,80 +41,54 @@ class DockerTestCluster(SingleNodeDockerCluster):
             return True
         return False
 
-    def put_test_data(self, contents):
-        """
-        Creates a randomly-named file in the test input dir and writes
-        the given content to it.
-        """
-
-        self.test_data = contents
-        file_name = str(uuid.uuid4())
-        file_abs_path = join(self.tmp_test_input_dir, file_name)
-        self.put_file_contents(contents.encode('utf-8'), file_abs_path)
-
-    def put_test_resource(self, file_name, contents):
-        """
-        Creates a resource file in the test resource dir and writes
-        the given content to it.
-        """
-
-        file_abs_path = join(self.tmp_test_resources_dir, file_name)
-        self.put_file_contents(contents, file_abs_path)
-
-    def restart_observer_if_needed(self):
-        if self.observer.is_alive():
-            return
-
-        self.observer = Observer()
-        self.done_event.clear()
-        self.observer.schedule(self.event_handler, self.tmp_test_output_dir)
-        self.observer.start()
-
-    def wait_for_output(self, timeout_seconds):
-        logging.info('Waiting up to %d seconds for test output...', 
timeout_seconds)
-        self.restart_observer_if_needed()
-        self.done_event.wait(timeout_seconds)
-        self.observer.stop()
-        self.observer.join()
-
-    def log_nifi_output(self):
-
+    def get_app_log(self):
         for container in self.containers.values():
             container = self.client.containers.get(container.id)
-            logging.info('Container logs for container \'%s\':\n%s', 
container.name, container.logs().decode("utf-8"))
             if b'Segmentation fault' in container.logs():
                 logging.warn('Container segfaulted: %s', container.name)
                 self.segfault=True
             if container.status == 'running':
-                apps = [("MiNiFi", self.minifi_root + '/logs/minifi-app.log'), 
("NiFi", self.nifi_root + '/logs/nifi-app.log')]
+                apps = [("MiNiFi", self.minifi_root + '/logs/minifi-app.log'), 
("NiFi", self.nifi_root + '/logs/nifi-app.log'), ("Kafka", 
self.kafka_broker_root + '/logs/server.log')]
                 for app in apps:
                     app_log_status, app_log = container.exec_run('/bin/sh -c 
\'cat ' + app[1] + '\'')
                     if app_log_status == 0:
                         logging.info('%s app logs for container \'%s\':\n', 
app[0], container.name)
-                        for line in app_log.decode("utf-8").splitlines():
-                            logging.info(line)
+                        return app_log
                         break
                 else:
                     logging.warning("The container is running, but none of %s 
logs were found", " or ".join([x[0] for x in apps]))
-
             else:
                 logging.info(container.status)
-                logging.info('Could not cat app logs for container \'%s\' 
because it is not running',
-                             container.name)
-            stats = container.stats(stream=False)
-            logging.info('Container stats:\n%s', stats)
-
-    def check_output(self, timeout=10, subdir=''):
-        """
-        Wait for flow output, validate it, and log minifi output.
-        """
-        if subdir:
-            self.output_validator.subdir = subdir
-        self.wait_for_output(timeout)
-        self.log_nifi_output()
-        if self.segfault:
-            return False
-        return self.output_validator.validate()
+                logging.info('Could not cat app logs for container \'%s\' 
because it is not running', container.name)
+        return None
+
+    def wait_for_app_logs(self, log, timeout_seconds, count=1):
+        wait_start_time = time.perf_counter()
+        for container_name, container in self.containers.items():
+            logging.info('Waiting for app-logs `%s` in container `%s`', log, 
container_name)
+            while (time.perf_counter() - wait_start_time) < timeout_seconds:
+                logs = self.get_app_log()
+                if logs is not None and count <= 
logs.decode("utf-8").count(log):
+                    return True
+                if logs is not None:
+                    for line in logs.decode("utf-8").splitlines():
+                        logging.info("App-log: %s", line)
+                time.sleep(1)
+        return False
+
+    def log_nifi_output(self):
+        app_log = self.get_app_log()
+        if app_log is None:
+            return
+        for line in app_log.decode("utf-8").splitlines():
+            logging.info(line)
+
+    def check_minifi_container_started(self):
+        for container in self.containers.values():
+            container = self.client.containers.get(container.id)
+            if b'Segmentation fault' in container.logs():
+                logging.warn('Container segfaulted: %s', container.name)
+                raise Exception("Container failed to start up.")
 
     def check_http_proxy_access(self, url):
         output = subprocess.check_output(["docker", "exec", "http-proxy", 
"cat", "/var/log/squid/access.log"]).decode(sys.stdout.encoding)
@@ -185,10 +98,10 @@ class DockerTestCluster(SingleNodeDockerCluster):
              output.count("TCP_DENIED/407") == 0 and "TCP_MISS" in output)
 
     @retry_check()
-    def check_s3_server_object_data(self):
+    def check_s3_server_object_data(self, test_data):
         s3_mock_dir = subprocess.check_output(["docker", "exec", "s3-server", 
"find", "/tmp/", "-type", "d", "-name", 
"s3mock*"]).decode(sys.stdout.encoding).strip()
         file_data = subprocess.check_output(["docker", "exec", "s3-server", 
"cat", s3_mock_dir + 
"/test_bucket/test_object_key/fileData"]).decode(sys.stdout.encoding)
-        return file_data == self.test_data
+        return file_data == test_data
 
     @retry_check()
     def check_s3_server_object_metadata(self, 
content_type="application/octet-stream", metadata=dict()):
@@ -203,38 +116,17 @@ class DockerTestCluster(SingleNodeDockerCluster):
         ls_result = subprocess.check_output(["docker", "exec", "s3-server", 
"ls", s3_mock_dir + "/test_bucket/"]).decode(sys.stdout.encoding)
         return not ls_result
 
-    def rm_out_child(self, dir):
-        logging.info('Removing %s from output folder', 
os.path.join(self.tmp_test_output_dir, dir))
-        shutil.rmtree(os.path.join(self.tmp_test_output_dir, dir))
-
     def wait_for_container_logs(self, container_name, log, timeout, count=1):
         logging.info('Waiting for logs `%s` in container `%s`', log, 
container_name)
         container = self.containers[container_name]
         check_count = 0
         while check_count <= timeout:
-            if container.logs().decode("utf-8").count(log) == count:
+            if count <= container.logs().decode("utf-8").count(log):
                 return True
             else:
                 check_count += 1
                 time.sleep(1)
         return False
 
-    def put_file_contents(self, contents, file_abs_path):
-        logging.info('Writing %d bytes of content to file: %s', len(contents), 
file_abs_path)
-        with open(file_abs_path, 'wb') as test_input_file:
-            test_input_file.write(contents)
-
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral test resources.
-        """
-
-        logging.info('Removing tmp test input dir: %s', 
self.tmp_test_input_dir)
-        shutil.rmtree(self.tmp_test_input_dir)
-        logging.info('Removing tmp test output dir: %s', 
self.tmp_test_output_dir)
-        shutil.rmtree(self.tmp_test_output_dir)
-        logging.info('Removing tmp test resources dir: %s', 
self.tmp_test_output_dir)
-        shutil.rmtree(self.tmp_test_resources_dir)
-
-        super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
+    def segfault_happened(self):
+        return self.segfault
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py 
b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
new file mode 100644
index 0000000..d9b6a2b
--- /dev/null
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -0,0 +1,104 @@
+import logging
+import os
+import shutil
+
+class DockerTestDirectoryBindings:
+    def __init__(self):
+        self.data_directories = {}
+
+    def __del__(self):
+        self.delete_data_directories()
+
+    def create_new_data_directories(self, test_id):
+        self.data_directories[test_id] = {
+            "input_dir": "/tmp/.nifi-test-input." + test_id,
+            "output_dir": "/tmp/.nifi-test-output." + test_id,
+            "resources_dir": "/tmp/.nifi-test-resources." + test_id
+        }
+
+        [self.create_directory(directory) for directory in 
self.data_directories[test_id].values()]
+
+        # Add resources
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on 
DockerVerify.sh
+        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", 
self.data_directories[test_id]["resources_dir"] + "/certs")
+
+    def get_data_directories(self, test_id):
+        return self.data_directories[test_id]
+
+    def docker_path_to_local_path(self, test_id, docker_path):
+        # Docker paths are currently hard-coded
+        if docker_path == "/tmp/input":
+            return self.data_directories[test_id]["input_dir"]
+        if docker_path == "/tmp/output":
+            return self.data_directories[test_id]["output_dir"]
+        if docker_path == "/tmp/resources":
+            return self.data_directories[test_id]["resources_dir"]
+        # Might be worth reworking these
+        if docker_path == "/tmp/output/success":
+            self.create_directory(self.data_directories[test_id]["output_dir"] 
+ "/success")
+            return self.data_directories[test_id]["output_dir"] + "/success"
+        if docker_path == "/tmp/output/failure":
+            self.create_directory(self.data_directories[test_id]["output_dir"] 
+ "/failure")
+            return self.data_directories[test_id]["output_dir"] + "/failure"
+        raise Exception("Docker directory \"%s\" has no preset bindings." % 
docker_path)
+
+    def get_directory_bindings(self, test_id):
+        """
+        Performs a standard container flow deployment with the addition
+        of volumes supporting test input/output directories.
+        """
+        vols = {}
+        vols[self.data_directories[test_id]["input_dir"]] = {"bind": 
"/tmp/input", "mode": "rw"}
+        vols[self.data_directories[test_id]["output_dir"]] = {"bind": 
"/tmp/output", "mode": "rw"}
+        vols[self.data_directories[test_id]["resources_dir"]] = {"bind": 
"/tmp/resources", "mode": "rw"}
+        return vols
+
+    @staticmethod
+    def create_directory(dir):
+        logging.info("Creating tmp dir: %s", dir)
+        os.makedirs(dir)
+        os.chmod(dir, 0o777)
+
+    @staticmethod
+    def delete_tmp_directory(dir):
+        assert dir.startswith("/tmp/")
+        if not dir.endswith("/"):
+            dir = dir + "/"
+        # Sometimes rmtree does clean not up as expected, setting 
ignore_errors does not help either
+        shutil.rmtree(dir, ignore_errors=True)
+
+    def delete_data_directories(self):
+        for directories in self.data_directories.values():
+            for directory in directories.values():
+                self.delete_tmp_directory(directory)
+
+    @staticmethod
+    def put_file_contents(file_abs_path, contents):
+        logging.info('Writing %d bytes of content to file: %s', len(contents), 
file_abs_path)
+        with open(file_abs_path, 'wb') as test_input_file:
+            test_input_file.write(contents)
+
+    def put_test_resource(self, test_id, file_name, contents):
+        """
+        Creates a resource file in the test resource dir and writes
+        the given content to it.
+        """
+
+        file_abs_path = 
os.path.join(self.data_directories[test_id]["resources_dir"], file_name)
+        self.put_file_contents(file_abs_path, contents)
+
+    def put_test_input(self, test_id, file_name, contents):
+        file_abs_path = 
os.path.join(self.data_directories[test_id]["input_dir"], file_name)
+        self.put_file_contents(file_abs_path, contents)
+
+    def put_file_to_docker_path(self, test_id, path, file_name, contents):
+        file_abs_path = os.path.join(self.docker_path_to_local_path(test_id, 
path), file_name)
+        self.put_file_contents(file_abs_path, contents)
+
+    def get_out_subdir(self, test_id, dir):
+        return os.path.join(self.data_directories[test_id]["output_dir"], dir)
+
+    def rm_out_child(self, test_id, dir):
+        child = os.path.join(self.data_directories[test_id]["output_dir"], dir)
+        logging.info('Removing %s from output folder', child)
+        shutil.rmtree(child)
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py 
b/docker/test/integration/minifi/core/FileSystemObserver.py
new file mode 100644
index 0000000..b4c0f7f
--- /dev/null
+++ b/docker/test/integration/minifi/core/FileSystemObserver.py
@@ -0,0 +1,46 @@
+import logging
+import time
+from threading import Event
+
+from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
+
+from .OutputEventHandler import OutputEventHandler
+from ..validators.FileOutputValidator import FileOutputValidator
+
+class FileSystemObserver(object):
+    def __init__(self, test_output_dir):
+
+        self.test_output_dir = test_output_dir
+
+        # Start observing output dir
+        self.done_event = Event()
+        self.event_handler = OutputEventHandler(self.done_event)
+        self.observer = Observer()
+        self.observer.schedule(self.event_handler, self.test_output_dir)
+        self.observer.start()
+
+    def get_output_dir(self):
+        return self.test_output_dir
+
+    def restart_observer_if_needed(self):
+        if self.observer.is_alive():
+            return
+
+        self.observer = Observer()
+        self.done_event.clear()
+        self.observer.schedule(self.event_handler, self.test_output_dir)
+        self.observer.start()
+
+    def wait_for_output(self, timeout_seconds, max_files):
+        logging.info('Waiting up to %d seconds for test output...', 
timeout_seconds)
+        self.restart_observer_if_needed()
+        wait_start_time = time.perf_counter()
+        for i in range(0, max_files):
+            # Note: The timing on Event.wait() is inaccurate
+            self.done_event.wait(timeout_seconds)
+            current_time = time.perf_counter()
+            if timeout_seconds < (current_time - wait_start_time):
+                break
+        self.observer.stop()
+        self.observer.join()
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py 
b/docker/test/integration/minifi/core/OutputEventHandler.py
index 3d4c984..f505695 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -3,22 +3,15 @@ import logging
 from watchdog.events import FileSystemEventHandler
 
 class OutputEventHandler(FileSystemEventHandler):
-    def __init__(self, validator, done_event):
-        self.validator = validator
+    def __init__(self, done_event):
         self.done_event = done_event
 
     def on_created(self, event):
         logging.info('Output file created: ' + event.src_path)
-        self.check(event)
+        self.done_event.set()
 
     def on_modified(self, event):
         logging.info('Output file modified: ' + event.src_path)
-        self.check(event)
-
-    def check(self, event):
-        if self.validator.validate():
-            logging.info('Output file is valid')
-            self.done_event.set()
-        else:
-            logging.info('Output file is invalid')
 
+    def on_deleted(self, event):
+        logging.info('Output file modified: ' + event.src_path)
diff --git a/docker/test/integration/minifi/core/Processor.py 
b/docker/test/integration/minifi/core/Processor.py
index e7e41e5..ddc2a9d 100644
--- a/docker/test/integration/minifi/core/Processor.py
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -1,5 +1,7 @@
 from .Connectable import Connectable
 
+import logging
+
 class Processor(Connectable):
     def __init__(self,
                  clazz,
@@ -37,6 +39,15 @@ class Processor(Connectable):
         }
         self.schedule.update(schedule)
 
+    def set_property(self, key, value):
+        if value.isdigit():
+            self.properties[key] = int(value)
+        else:
+            self.properties[key] = value
+
+    def set_scheduling_period(self, value):
+        self.schedule['scheduling period'] = value
+
     def nifi_property_key(self, key):
         """
         Returns the Apache NiFi-equivalent property key for the given key. 
This is often, but not always, the same as
diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py 
b/docker/test/integration/minifi/core/RemoteProcessGroup.py
index 6901fb6..6132ad4 100644
--- a/docker/test/integration/minifi/core/RemoteProcessGroup.py
+++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py
@@ -1,8 +1,7 @@
 import uuid
 
 class RemoteProcessGroup(object):
-    def __init__(self, url,
-                 name=None):
+    def __init__(self, url, name=None):
         self.uuid = uuid.uuid4()
 
         if name is None:
@@ -11,3 +10,10 @@ class RemoteProcessGroup(object):
             self.name = name
 
         self.url = url
+
+
+    def get_name(self):
+       return self.name
+
+    def get_uuid(self):
+       return self.uuid
diff --git a/docker/test/integration/minifi/core/SSL_cert_utils.py 
b/docker/test/integration/minifi/core/SSL_cert_utils.py
new file mode 100644
index 0000000..c2461c3
--- /dev/null
+++ b/docker/test/integration/minifi/core/SSL_cert_utils.py
@@ -0,0 +1,54 @@
+import time
+import logging
+
+from M2Crypto import X509, EVP, RSA, ASN1
+
+def gen_cert():
+    """
+    Generate TLS certificate request for testing
+    """
+
+    req, key = gen_req()
+    pub_key = req.get_pubkey()
+    subject = req.get_subject()
+    cert = X509.X509()
+    # noinspection PyTypeChecker
+    cert.set_serial_number(1)
+    cert.set_version(2)
+    cert.set_subject(subject)
+    t = int(time.time())
+    now = ASN1.ASN1_UTCTIME()
+    now.set_time(t)
+    now_plus_year = ASN1.ASN1_UTCTIME()
+    now_plus_year.set_time(t + 60 * 60 * 24 * 365)
+    cert.set_not_before(now)
+    cert.set_not_after(now_plus_year)
+    issuer = X509.X509_Name()
+    issuer.C = 'US'
+    issuer.CN = 'minifi-listen'
+    cert.set_issuer(issuer)
+    cert.set_pubkey(pub_key)
+    cert.sign(key, 'sha256')
+
+    return cert, key
+
+def rsa_gen_key_callback():
+    pass
+
+def gen_req():
+    """
+    Generate TLS certificate request for testing
+    """
+
+    logging.info('Generating test certificate request')
+    key = EVP.PKey()
+    req = X509.Request()
+    rsa = RSA.gen_key(1024, 65537, rsa_gen_key_callback)
+    key.assign_rsa(rsa)
+    req.set_pubkey(key)
+    name = req.get_subject()
+    name.C = 'US'
+    name.CN = 'minifi-listen'
+    req.sign(key, 'sha256')
+
+    return req, key
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py 
b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index f48c288..b4dede7 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -3,6 +3,7 @@ import docker
 import logging
 import os
 import tarfile
+import time
 import uuid
 
 from collections import OrderedDict
@@ -22,8 +23,13 @@ class SingleNodeDockerCluster(Cluster):
     def __init__(self):
         self.minifi_version = os.environ['MINIFI_VERSION']
         self.nifi_version = '1.7.0'
+        self.engine = 'minifi-cpp'
+        self.flow = None
+        self.name = None
+        self.vols = {}
         self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
         self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
+        self.kafka_broker_root = '/opt/kafka'
         self.network = None
         self.containers = OrderedDict()
         self.images = []
@@ -32,44 +38,84 @@ class SingleNodeDockerCluster(Cluster):
         # Get docker client
         self.client = docker.from_env()
 
-    def deploy_flow(self,
-                    flow,
-                    name=None,
-                    vols=None,
-                    engine='minifi-cpp'):
+    def __del__(self):
+        """
+        Clean up ephemeral cluster resources
+        """
+
+        # Containers and networks are expected to be freed outside of this 
function
+
+        # Clean up images
+        for image in reversed(self.images):
+            logging.info('Cleaning up image: %s', image[0].id)
+            self.client.images.remove(image[0].id, force=True)
+
+        # Clean up tmp files
+        for tmp_file in self.tmp_files:
+            os.remove(tmp_file)
+
+    def set_name(self, name):
+        self.name = name
+
+    def get_name(self):
+        return self.name
+
+    def set_engine(self, engine):
+        self.engine = engine
+
+    def get_engine(self):
+        return self.engine
+
+    def get_flow(self):
+        return self.flow
+
+    def set_flow(self, flow):
+        self.flow = flow
+
+    def set_directory_bindings(self, bindings):
+        self.vols = bindings
+
+    @staticmethod
+    def create_docker_network():
+        net_name = 'minifi_integration_test_network-' + str(uuid.uuid4())
+        logging.info('Creating network: %s', net_name)
+        return docker.from_env().networks.create(net_name)
+
+    def set_network(self, network):
+        self.network = network
+
+    def deploy_flow(self):
         """
         Compiles the flow to a valid config file and overlays it into a new 
image.
         """
 
-        if vols is None:
-            vols = {}
+        if self.vols is None:
+            self.vols = {}
 
-        logging.info('Deploying %s flow...%s', engine,name)
+        if self.name is None:
+            self.name = self.engine + '-' + str(uuid.uuid4())
+            logging.info('Flow name was not provided; using generated name 
\'%s\'', self.name)
 
-        if name is None:
-            name = engine + '-' + str(uuid.uuid4())
-            logging.info('Flow name was not provided; using generated name 
\'%s\'', name)
+        logging.info('Deploying %s flow \"%s\"...', self.engine, self.name)
 
         # Create network if necessary
         if self.network is None:
-            net_name = 'nifi-' + str(uuid.uuid4())
-            logging.info('Creating network: %s', net_name)
-            self.network = self.client.networks.create(net_name)
-
-        if engine == 'nifi':
-            self.deploy_nifi_flow(flow, name, vols)
-        elif engine == 'minifi-cpp':
-            self.deploy_minifi_cpp_flow(flow, name, vols)
-        elif engine == 'kafka-broker':
-            self.deploy_kafka_broker(name)
-        elif engine == 'http-proxy':
+            self.set_network(self.create_docker_network())
+
+        if self.engine == 'nifi':
+            self.deploy_nifi_flow()
+        elif self.engine == 'minifi-cpp':
+            self.deploy_minifi_cpp_flow()
+        elif self.engine == 'kafka-broker':
+            self.deploy_kafka_broker()
+        elif self.engine == 'http-proxy':
             self.deploy_http_proxy()
-        elif engine == 's3-server':
+        elif self.engine == 's3-server':
             self.deploy_s3_server()
         else:
-            raise Exception('invalid flow engine: \'%s\'' % engine)
+            raise Exception('invalid flow engine: \'%s\'' % self.engine)
 
-    def deploy_minifi_cpp_flow(self, flow, name, vols):
+    def deploy_minifi_cpp_flow(self):
 
         # Build configured image
         dockerfile = dedent("""FROM {base_image}
@@ -77,12 +123,12 @@ class SingleNodeDockerCluster(Cluster):
                 ADD config.yml {minifi_root}/conf/config.yml
                 RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
                 USER minificpp
-                """.format(name=name,hostname=name,
+                """.format(name=self.name,hostname=self.name,
                            base_image='apacheminificpp:' + self.minifi_version,
                            minifi_root=self.minifi_root))
 
         serializer = Minifi_flow_yaml_serializer()
-        test_flow_yaml = serializer.serialize(flow)
+        test_flow_yaml = serializer.serialize(self.flow)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
 
         conf_file_buffer = BytesIO()
@@ -105,20 +151,19 @@ class SingleNodeDockerCluster(Cluster):
         finally:
             conf_file_buffer.close()
 
-        logging.info('Creating and running docker container for flow...')
-
         container = self.client.containers.run(
                 configured_image[0],
                 detach=True,
-                name=name,
+                name=self.name,
                 network=self.network.name,
-                volumes=vols)
-
+                volumes=self.vols)
+        self.network.reload()
+        
         logging.info('Started container \'%s\'', container.name)
 
         self.containers[container.name] = container
 
-    def deploy_nifi_flow(self, flow, name, vols):
+    def deploy_nifi_flow(self):
         dockerfile = dedent(r"""FROM {base_image}
                 USER root
                 ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
@@ -126,12 +171,12 @@ class SingleNodeDockerCluster(Cluster):
                 RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' 
{nifi_root}/conf/nifi.properties
                 RUN sed -i -e 
's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' 
{nifi_root}/conf/nifi.properties
                 USER nifi
-                """.format(name=name,
+                """.format(name=self.name,
                            base_image='apache/nifi:' + self.nifi_version,
                            nifi_root=self.nifi_root))
 
         serializer = Nifi_flow_xml_serializer()
-        test_flow_xml = serializer.serialize(flow, self.nifi_version)
+        test_flow_xml = serializer.serialize(self.flow, self.nifi_version)
         logging.info('Using generated flow config xml:\n%s', test_flow_xml)
 
         conf_file_buffer = BytesIO()
@@ -160,16 +205,16 @@ class SingleNodeDockerCluster(Cluster):
         container = self.client.containers.run(
                 configured_image[0],
                 detach=True,
-                name=name,
-                hostname=name,
+                name=self.name,
+                hostname=self.name,
                 network=self.network.name,
-                volumes=vols)
+                volumes=self.vols)
 
         logging.info('Started container \'%s\'', container.name)
 
         self.containers[container.name] = container
 
-    def deploy_kafka_broker(self, name):
+    def deploy_kafka_broker(self):
         logging.info('Creating and running docker containers for kafka 
broker...')
         zookeeper = self.client.containers.run(
                     self.client.images.pull("wurstmeister/zookeeper:latest"),
@@ -287,33 +332,3 @@ class SingleNodeDockerCluster(Cluster):
         except Exception as e:
             logging.info(e)
             raise
-
-    def __enter__(self):
-        """
-        Allocate ephemeral cluster resources.
-        """
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral cluster resources
-        """
-
-        # Clean up containers
-        for container in self.containers.values():
-            logging.info('Cleaning up container: %s', container.name)
-            container.remove(v=True, force=True)
-
-        # Clean up images
-        for image in reversed(self.images):
-            logging.info('Cleaning up image: %s', image[0].id)
-            self.client.images.remove(image[0].id, force=True)
-
-        # Clean up network
-        if self.network is not None:
-            logging.info('Cleaning up network network: %s', self.network.name)
-            self.network.remove()
-
-        # Clean up tmp files
-        for tmp_file in self.tmp_files:
-            os.remove(tmp_file)
diff --git a/docker/test/integration/minifi/processors/GenerateFlowFile.py 
b/docker/test/integration/minifi/processors/GenerateFlowFile.py
index 93d42ca..65af6cf 100644
--- a/docker/test/integration/minifi/processors/GenerateFlowFile.py
+++ b/docker/test/integration/minifi/processors/GenerateFlowFile.py
@@ -1,8 +1,7 @@
 from ..core.Processor import Processor
 
 class GenerateFlowFile(Processor):
-    def __init__(self, file_size, schedule={'scheduling period': '0 sec'}):
+    def __init__(self, schedule={'scheduling period': '2 sec'}):
         super(GenerateFlowFile, self).__init__('GenerateFlowFile',
-                       properties={'File Size': file_size},
                        schedule=schedule,
                        auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/GetFile.py 
b/docker/test/integration/minifi/processors/GetFile.py
index 29b8180..23a575d 100644
--- a/docker/test/integration/minifi/processors/GetFile.py
+++ b/docker/test/integration/minifi/processors/GetFile.py
@@ -1,7 +1,7 @@
 from ..core.Processor import Processor
 
 class GetFile(Processor):
-       def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}):
+       def __init__(self, input_dir ="/tmp/input", schedule={'scheduling 
period': '2 sec'}):
                super(GetFile, self).__init__('GetFile',
                        properties={'Input Directory': input_dir, 'Keep Source 
File': 'true'},
                        schedule=schedule,
diff --git a/docker/test/integration/minifi/processors/HashContent.py 
b/docker/test/integration/minifi/processors/HashContent.py
new file mode 100644
index 0000000..f88f05f
--- /dev/null
+++ b/docker/test/integration/minifi/processors/HashContent.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+class HashContent(Processor):
+       def __init__(self, schedule={"scheduling period": "2 sec"}):
+               super(HashContent, self).__init__("HashContent",
+                       properties={"Hash Attribute": "hash"},
+                       schedule=schedule,
+                       auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/InvokeHTTP.py 
b/docker/test/integration/minifi/processors/InvokeHTTP.py
index 135c8d5..a19688e 100644
--- a/docker/test/integration/minifi/processors/InvokeHTTP.py
+++ b/docker/test/integration/minifi/processors/InvokeHTTP.py
@@ -1,21 +1,14 @@
 from ..core.Processor import Processor
 
 class InvokeHTTP(Processor):
-    def __init__(self, url,
-        method='GET',
-        proxy_host='',
-        proxy_port='',
-        proxy_username='',
-        proxy_password='',
+    def __init__(self,
         ssl_context_service=None,
         schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
             properties = {
-                'Remote URL': url,
-                'HTTP Method': method,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'invokehttp-proxy-username': proxy_username,
-                'invokehttp-proxy-password': proxy_password }
+                "Proxy Host": "",
+                "Proxy Port": "",
+                "invokehttp-proxy-username": "",
+                "invokehttp-proxy-password": "" }
 
             controller_services = []
 
@@ -28,3 +21,4 @@ class InvokeHTTP(Processor):
                 controller_services = controller_services,
                 auto_terminate = ['success', 'response', 'retry', 'failure', 
'no retry'],
                 schedule = schedule)
+            self.out_proc.connect({"failure": self})
diff --git a/docker/test/integration/minifi/processors/ListenHTTP.py 
b/docker/test/integration/minifi/processors/ListenHTTP.py
index 6f5bca1..7eadc92 100644
--- a/docker/test/integration/minifi/processors/ListenHTTP.py
+++ b/docker/test/integration/minifi/processors/ListenHTTP.py
@@ -1,8 +1,8 @@
 from ..core.Processor import Processor
 
 class ListenHTTP(Processor):
-    def __init__(self, port, cert=None, schedule=None):
-        properties = {'Listening Port': port}
+    def __init__(self, cert=None, schedule=None):
+        properties = {}
 
         if cert is not None:
             properties['SSL Certificate'] = cert
diff --git a/docker/test/integration/minifi/processors/PublishKafkaSSL.py 
b/docker/test/integration/minifi/processors/PublishKafkaSSL.py
deleted file mode 100644
index 82c33f6..0000000
--- a/docker/test/integration/minifi/processors/PublishKafkaSSL.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from ..core.Processor import Processor
-
-class PublishKafkaSSL(Processor):
-    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(PublishKafkaSSL, self).__init__('PublishKafka',
-            properties={'Client Name': 'LMN', 'Known Brokers': 
'kafka-broker:9093',
-                'Topic Name': 'test', 'Batch Size': '10',
-                'Compress Codec': 'none', 'Delivery Guarantee': '1',
-                'Request Timeout': '10 sec', 'Message Timeout': '12 sec',
-                'Security CA': '/tmp/resources/certs/ca-cert',
-                'Security Cert': '/tmp/resources/certs/client_LMN_client.pem',
-                'Security Pass Phrase': 'abcdefgh',
-                'Security Private Key': 
'/tmp/resources/certs/client_LMN_client.key',
-                'Security Protocol': 'ssl'},
-            auto_terminate=['success'],
-            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutFile.py 
b/docker/test/integration/minifi/processors/PutFile.py
index 047d32d..db307dd 100644
--- a/docker/test/integration/minifi/processors/PutFile.py
+++ b/docker/test/integration/minifi/processors/PutFile.py
@@ -1,7 +1,7 @@
 from ..core.Processor import Processor
 
 class PutFile(Processor):
-    def __init__(self, output_dir, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
+    def __init__(self, output_dir="/tmp/output", schedule={'scheduling 
strategy': 'EVENT_DRIVEN'}):
         super(PutFile, self).__init__('PutFile',
             properties={'Directory': output_dir, 'Directory Permissions': 
'777', 'Permissions': '777'},
             auto_terminate=['success', 'failure'],
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py 
b/docker/test/integration/minifi/processors/PutS3Object.py
index 74fb4a8..ab8b1be 100644
--- a/docker/test/integration/minifi/processors/PutS3Object.py
+++ b/docker/test/integration/minifi/processors/PutS3Object.py
@@ -17,4 +17,4 @@ class PutS3Object(Processor):
                 'Proxy Port': proxy_port,
                 'Proxy Username': proxy_username,
                 'Proxy Password': proxy_password },
-            auto_terminate = ['success'])
+            auto_terminate = ["success", "failure"])
diff --git a/docker/test/integration/steps/steps.py 
b/docker/test/integration/steps/steps.py
new file mode 100644
index 0000000..e376c2f
--- /dev/null
+++ b/docker/test/integration/steps/steps.py
@@ -0,0 +1,256 @@
+from MiNiFi_integration_test_driver import MiNiFi_integration_test
+
+from minifi.core.DockerTestCluster import DockerTestCluster
+from minifi.core.FileSystemObserver import FileSystemObserver
+from minifi.core.RemoteProcessGroup import RemoteProcessGroup
+from minifi.core.InputPort import InputPort
+from minifi.core.SSLContextService import SSLContextService
+from minifi.core.SSL_cert_utils import gen_cert, gen_req, rsa_gen_key_callback
+
+from minifi.processors.PublishKafka import PublishKafka
+from minifi.processors.PutS3Object import PutS3Object
+from minifi.processors.DeleteS3Object import DeleteS3Object
+from minifi.processors.FetchS3Object import FetchS3Object
+
+
+from behave import given, then, when
+from behave.model_describe import ModelDescriptor
+from copy import copy
+from copy import deepcopy
+from pydoc import locate
+from pytimeparse.timeparse import timeparse
+
+import os
+import logging
+import re
+import time
+import uuid
+
+# Background
+
+@given("the content of \"{directory}\" is monitored")
+def step_impl(context, directory):
+    
context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
+
+# MiNiFi cluster setups
+
+@given("a {processor_type} processor with the \"{property}\" property set to 
\"{property_value}\" in a \"{cluster_name}\" flow")
+@given("a {processor_type} processor with the \"{property}\" property set to 
\"{property_value}\" in the \"{cluster_name}\" flow")
+def step_impl(context, processor_type, property, property_value, cluster_name):
+    logging.info("Acquiring " + cluster_name)
+    cluster = context.test.acquire_cluster(cluster_name)
+    processor = locate("minifi.processors." + processor_type + "." + 
processor_type)()
+    processor.set_property(property, property_value)
+    processor.set_name(processor_type)
+    context.test.add_node(processor)
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_name(cluster_name)
+        cluster.set_flow(processor)
+
+
+@given("a {processor_type} processor with the \"{property}\" property set to 
\"{property_value}\"")
+def step_impl(context, processor_type, property, property_value):
+    context.execute_steps("given a {processor_type} processor with the 
\"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" 
flow".
+        format(processor_type=processor_type, property=property, 
property_value=property_value, cluster_name="primary_cluster"))
+
+@given("a set of processors in the \"{cluster_name}\" flow")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    logging.info(context.table)
+    for row in context.table:
+        processor = locate("minifi.processors." + row["type"] + "." + 
row["type"])()
+        processor.set_name(row["name"])
+        processor.set_uuid(row["uuid"])
+        context.test.add_node(processor)
+        # Assume that the first node declared is primary unless specified 
otherwise
+        if cluster.get_flow() is None:
+            cluster.set_flow(processor)
+
+@given("a set of processors")
+def step_impl(context):
+    rendered_table = ModelDescriptor.describe_table(context.table, "    ")
+    context.execute_steps("""given a set of processors in the 
\"{cluster_name}\" flow
+        {table}
+        """.format(cluster_name="primary_cluster", table=rendered_table))
+
+@given("a RemoteProcessGroup node opened on \"{address}\"")
+def step_impl(context, address):
+    remote_process_group = RemoteProcessGroup(address, "RemoteProcessGroup")
+    context.test.add_remote_process_group(remote_process_group)
+
+@given("a PutS3Object processor set up to communicate with an s3 server")
+def step_impl(context):
+    # PublishKafka is never the first node of a flow potential cluster-flow 
setup is omitted
+    put_s3 = PutS3Object()
+    put_s3.set_name("PutS3Object")
+    context.test.add_node(put_s3)
+
+@given("a DeleteS3Object processor set up to communicate with the same s3 
server")
+@given("a DeleteS3Object processor set up to communicate with an s3 server")
+def step_impl(context):
+    delete_s3 = DeleteS3Object()
+    delete_s3.set_name("DeleteS3Object")
+    context.test.add_node(delete_s3)
+
+@given("a FetchS3Object processor set up to communicate with the same s3 
server")
+@given("a FetchS3Object processor set up to communicate with an s3 server")
+def step_impl(context):
+    fetch_s3 = FetchS3Object()
+    fetch_s3.set_name("FetchS3Object")
+    context.test.add_node(fetch_s3)
+
+@given("a PublishKafka processor set up to communicate with a kafka broker 
instance")
+def step_impl(context):
+    # PublishKafka is never the first node of a flow potential cluster-flow 
setup is omitted
+    publish_kafka = PublishKafka()
+    publish_kafka.set_name("PublishKafka")
+    context.test.add_node(publish_kafka)
+
+@given("the \"{property_name}\" of the {processor_name} processor is set to 
\"{property_value}\"")
+def step_impl(context, property_name, processor_name, property_value):
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_property(property_name, property_value)
+
+
+@given("the scheduling period of the {processor_name} processor is set to 
\"{sceduling_period}\"")
+def step_impl(context, processor_name, sceduling_period):
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_scheduling_period(sceduling_period)
+
+@given("these processor properties are set")
+@given("these processor properties are set to match the http proxy")
+def step_impl(context):
+    for row in context.table:
+        context.test.get_node_by_name(row["processor 
name"]).set_property(row["property name"], row["property value"])
+
+@given("the \"{relationship}\" relationship of the {source_name} processor is 
connected to the input port on the {remote_process_group_name}")
+def step_impl(context, relationship, source_name, remote_process_group_name):
+    source = context.test.get_node_by_name(source_name)
+    remote_process_group = 
context.test.get_remote_process_group_by_name(remote_process_group_name)
+    input_port_node = 
context.test.generate_input_port_for_remote_process_group(remote_process_group, 
"to_nifi")
+    context.test.add_node(input_port_node)
+    source.out_proc.connect({relationship: input_port_node})
+
+@given("the \"{relationship}\" relationship of the {source_name} is connected 
to the {destination_name}")
+@given("the \"{relationship}\" relationship of the {source_name} processor is 
connected to the {destination_name}")
+def step_impl(context, relationship, source_name, destination_name):
+    source = context.test.get_node_by_name(source_name)
+    destination = context.test.get_node_by_name(destination_name)
+    source.out_proc.connect({relationship: destination})
+
+@given("the processors are connected up as described here")
+def step_impl(context):
+    for row in context.table:
+        context.execute_steps(
+            "given the \"" + row["relationship name"] + "\" relationship of 
the " + row["source name"] + " processor is connected to the " + 
row["destination name"])
+
+@given("the connection going to the RemoteProcessGroup has \"drop empty\" set")
+def step_impl(context):
+    input_port = context.test.get_node_by_name("to_nifi")
+    input_port.drop_empty_flowfiles = True
+
+@given("a file with the content \"{content}\" is present in \"{path}\"")
+def step_impl(context, content, path):
+    context.test.add_test_data(path, content)
+
+# NiFi setups
+
+@given("a NiFi flow \"{cluster_name}\" receiving data from a 
RemoteProcessGroup \"{source_name}\" on port {port}")
+def step_impl(context, cluster_name, source_name, port):
+    remote_process_group = 
context.test.get_remote_process_group_by_name("RemoteProcessGroup")
+    source = 
context.test.generate_input_port_for_remote_process_group(remote_process_group, 
"from-minifi")
+    context.test.add_node(source)
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name('nifi')
+    cluster.set_engine('nifi')
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_flow(source)
+
+@given("in the \"{cluster_name}\" flow the \"{relationship}\" relationship of 
the {source_name} processor is connected to the {destination_name}")
+def step_impl(context, cluster_name, relationship, source_name, 
destination_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    source = context.test.get_or_create_node_by_name(source_name)
+    destination = context.test.get_or_create_node_by_name(destination_name)
+    source.out_proc.connect({relationship: destination})
+    if cluster.get_flow() is None:
+        cluster.set_flow(source)
+
+# HTTP proxy setup
+
+@given("the http proxy server \"{cluster_name}\" is set up")
+@given("a http proxy server \"{cluster_name}\" is set up accordingly")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name(cluster_name)
+    cluster.set_engine("http-proxy")
+    cluster.set_flow(None)
+
+# TLS
+# 
+@given("an ssl context service set up for {producer_name} and {consumer_name}")
+def step_impl(context, producer_name, consumer_name):
+    cert, key = gen_cert()
+    crt_file = '/tmp/resources/test-crt.pem'
+    ssl_context_service = SSLContextService(cert=crt_file, ca_cert=crt_file)
+    context.test.put_test_resource('test-crt.pem', cert.as_pem() + 
key.as_pem(None, rsa_gen_key_callback))
+    producer = context.test.get_node_by_name(producer_name)
+    producer.controller_services.append(ssl_context_service)
+    producer.set_property("SSL Context Service", ssl_context_service.name)
+    consumer = context.test.get_node_by_name(consumer_name)
+    consumer.set_property("SSL Certificate", crt_file)
+    consumer.set_property("SSL Verify Peer", "no")
+
+# Kafka setup
+
+@given("a kafka broker \"{cluster_name}\" is set up in correspondence with the 
PublishKafka")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name(cluster_name)
+    cluster.set_engine("kafka-broker")
+    cluster.set_flow(None)
+
+# s3 setup
+
+@given("a s3 server \"{cluster_name}\" is set up in correspondence with the 
PutS3Object")
+@given("a s3 server \"{cluster_name}\" is set up in correspondence with the 
DeleteS3Object")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name(cluster_name)
+    cluster.set_engine("s3-server")
+    cluster.set_flow(None)
+
+@when("the MiNiFi instance starts up")
+@when("both instances start up")
+@when("all instances start up")
+def step_impl(context):
+    context.test.start()
+
+@then("a flowfile with the content \"{content}\" is placed in the monitored 
directory in less than {duration}")
+def step_impl(context, content, duration):
+    context.test.check_for_file_with_content_generated(content, 
timeparse(duration))
+
+@then("at least one empty flowfile is placed in the monitored directory in 
less than {duration}")
+def step_impl(context, duration):
+    context.test.check_for_multiple_empty_files_generated(timeparse(duration))
+
+@then("no files are placed in the monitored directory in {duration} of running 
time")
+def step_impl(context, duration):
+    context.test.check_for_no_files_generated(timeparse(duration))
+
+@then("no errors were generated on the \"{cluster_name}\" regarding \"{url}\"")
+def step_impl(context, cluster_name, url):
+    context.test.check_http_proxy_access(cluster_name, url)
+
+@then("the object on the \"{cluster_name}\" s3 server is \"{object_data}\"")
+def step_impl(context, cluster_name, object_data):
+    context.test.check_s3_server_object_data(cluster_name, object_data)
+
+@then("the object content type on the \"{cluster_name}\" s3 server is 
\"{content_type}\" and the object metadata matches use metadata")
+def step_impl(context, cluster_name, content_type):
+    context.test.check_s3_server_object_metadata(cluster_name, content_type)
+
+@then("the object bucket on the \"{cluster_name}\" s3 server is empty")
+def step_impl(context, cluster_name):
+    context.test.check_empty_s3_bucket(cluster_name)
diff --git a/docker/test/integration/test_filesystem_ops.py 
b/docker/test/integration/test_filesystem_ops.py
deleted file mode 100644
index 5b48e6e..0000000
--- a/docker/test/integration/test_filesystem_ops.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_get_put():
-    """
-    Verify basic file get/put operations.
-    """
-
-    flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(flow)
-
-        assert cluster.check_output()
-
-
-def test_file_exists_failure():
-    """
-    Verify that putting to a file that already exists fails.
-    """
-
-    flow = (GetFile('/tmp/input')
-
-            # First put should succeed
-            >> PutFile('/tmp')
-
-            # Second put should fail (file exists)
-            >> PutFile('/tmp')
-            >> (('success', LogAttribute()),
-                ('failure', LogAttribute() >> PutFile('/tmp/output'))))
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(flow)
-
-        assert cluster.check_output()
diff --git a/docker/test/integration/test_filter_zero_file.py 
b/docker/test/integration/test_filter_zero_file.py
deleted file mode 100644
index 918582b..0000000
--- a/docker/test/integration/test_filter_zero_file.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_filter_zero_file():
-    """
-    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
-    """
-
-    port = InputPort('from-minifi', 
RemoteProcessGroup('http://nifi:8080/nifi'))
-
-    recv_flow = (port
-                 >> LogAttribute()
-                 >> PutFile('/tmp/output'))
-
-    send_flow = (GenerateFlowFile('0B')
-                 >> LogAttribute()
-                 >> ~port)
-
-    with DockerTestCluster(NoFileOutPutValidator()) as cluster:
-        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
-        cluster.deploy_flow(send_flow)
-        assert cluster.check_output(120)
diff --git a/docker/test/integration/test_hash_content.py 
b/docker/test/integration/test_hash_content.py
deleted file mode 100644
index e60d3a3..0000000
--- a/docker/test/integration/test_hash_content.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_hash_invoke():
-    """
-    Verify sending using InvokeHTTP to a receiver using ListenHTTP.
-    """
-    invoke_flow = (GetFile('/tmp/input') >> 
Processor(name='HashContent',clazz='HashContent',properties={'Hash Attribute': 
'hash'},auto_terminate=['failure']) 
-                   >> InvokeHTTP('http://minifi-listen:8080/contentListener', 
method='POST'))
-
-    listen_flow = ListenHTTP(8080)  >> LogAttribute() >>  
PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
diff --git a/docker/test/integration/test_http.py 
b/docker/test/integration/test_http.py
deleted file mode 100644
index f9431d4..0000000
--- a/docker/test/integration/test_http.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_invoke_listen():
-    """
-    Verify sending using InvokeHTTP to a receiver using ListenHTTP.
-    """
-    invoke_flow = (GetFile('/tmp/input')
-                   >> LogAttribute()
-                   >> InvokeHTTP('http://minifi-listen:8080/contentListener', 
method='POST'))
-
-    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
-
-def test_invoke_listen_with_proxy():
-    """
-    Verify sending through a proxy using InvokeHTTP to a receiver using 
ListenHTTP.
-    """
-    invoke_flow = (GetFile('/tmp/input')
-                   >> LogAttribute()
-                   >> InvokeHTTP('http://minifi-listen:8080/contentListener',
-                                 method='POST',
-                                 proxy_host='http-proxy',
-                                 proxy_port='3128',
-                                 proxy_username='admin',
-                                 proxy_password='test101'))
-
-    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
-        assert 
cluster.check_http_proxy_access("http://minifi-listen:8080/contentListener";)
diff --git a/docker/test/integration/test_rdkafka.py 
b/docker/test/integration/test_rdkafka.py
deleted file mode 100644
index bea36db..0000000
--- a/docker/test/integration/test_rdkafka.py
+++ /dev/null
@@ -1,98 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_publish_kafka():
-    """
-    Verify delivery of message to kafka broker
-    """
-    producer_flow = GetFile('/tmp/input') >> PublishKafka() \
-                        >> (('failure', LogAttribute()),
-                            ('success', PutFile('/tmp/output/success')))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='kafka-broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', 
engine='minifi-cpp')
-
-        assert cluster.check_output(30)
-
-def test_no_broker():
-    """
-    Verify failure case when broker is down
-    """
-    producer_flow = (GetFile('/tmp/input') >> PublishKafka()
-                        >> (('failure', PutFile('/tmp/output')),
-                            ('success', LogAttribute())))
-
-    with DockerTestCluster(SingleFileOutputValidator('no broker')) as cluster:
-        cluster.put_test_data('no broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', 
engine='minifi-cpp')
-
-        assert cluster.check_output(60)
-
-def test_broker_on_off():
-    """
-    Verify delivery of message when broker is unstable
-    """
-    producer_flow = (GetFile('/tmp/input') >> PublishKafka()
-                     >> (('success', PutFile('/tmp/output/success')),
-                         ('failure', PutFile('/tmp/output/failure'))))
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='kafka-broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', 
engine='minifi-cpp')
-        start_count = 1
-        stop_count = 0
-
-        def start_kafka():
-            nonlocal start_count
-            assert cluster.start_flow('kafka-broker')
-            assert cluster.start_flow('kafka-consumer')
-            start_count += 1
-            assert cluster.wait_for_container_logs('zookeeper', 'Established 
session', 30, start_count)
-        def stop_kafka():
-            nonlocal stop_count
-            assert cluster.stop_flow('kafka-consumer')
-            assert cluster.stop_flow('kafka-broker')
-            stop_count += 1
-            assert cluster.wait_for_container_logs('zookeeper', 'Processed 
session termination for sessionid', 30, stop_count)
-
-        assert cluster.check_output(30, subdir='success')
-        stop_kafka()
-        assert cluster.check_output(60, subdir='failure')
-        start_kafka()
-        cluster.rm_out_child('success')
-        assert cluster.check_output(60, subdir='success')
-        stop_kafka()
-        cluster.rm_out_child('failure')
-        assert cluster.check_output(60, subdir='failure')
-
-def test_ssl():
-    """
-    Verify security connection
-    """
-    producer_flow = GetFile('/tmp/input') >> PublishKafkaSSL() \
-                    >> (('failure', LogAttribute()),
-                        ('success', PutFile('/tmp/output/ssl')))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='ssl')) as 
cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='kafka-broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', 
engine='minifi-cpp')
-
-        assert cluster.check_output(30)
diff --git a/docker/test/integration/test_s2s.py 
b/docker/test/integration/test_s2s.py
deleted file mode 100644
index ff1a312..0000000
--- a/docker/test/integration/test_s2s.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_minifi_to_nifi():
-    """
-    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
-    """
-
-    port = InputPort('from-minifi', 
RemoteProcessGroup('http://nifi:8080/nifi'))
-
-    recv_flow = (port
-                 >> LogAttribute()
-                 >> PutFile('/tmp/output'))
-
-    send_flow = (GetFile('/tmp/input')
-                 >> LogAttribute()
-                 >> port)
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
-        cluster.deploy_flow(send_flow)
-
-        assert cluster.check_output(120)
diff --git a/docker/test/integration/test_s3.py 
b/docker/test/integration/test_s3.py
deleted file mode 100644
index a4bcebe..0000000
--- a/docker/test/integration/test_s3.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_put_s3_object():
-    """
-    Verify delivery of S3 object to AWS server
-    """
-    flow = (GetFile('/tmp/input') >> PutS3Object() \
-                 >> LogAttribute() \
-                 >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('LH_O#L|FD<FASD{FO#@$#$%^ 
"#"$L%:"@#$L":test_data#$#%#$%?{"F{')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-
-        assert cluster.check_output(60)
-
-        assert cluster.check_s3_server_object_data()
-        assert cluster.check_s3_server_object_metadata()
-
-def test_put_s3_object_proxy():
-    """
-    Verify delivery of S3 object to AWS server through proxy server
-    """
-    flow = (GetFile('/tmp/input') \
-            >> PutS3Object(proxy_host='http-proxy',
-                           proxy_port='3128',
-                           proxy_username='admin',
-                           proxy_password='test101') \
-            >> LogAttribute() \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('LH_O#L|FD<FASD{FO#@$#$%^ 
"#"$L%:"@#$L":test_data#$#%#$%?{"F{')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-
-        assert cluster.check_output(60)
-
-        assert 
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key";)
-        assert cluster.check_s3_server_object_data()
-        assert cluster.check_s3_server_object_metadata()
-
-def test_delete_s3_object():
-    """
-    Verify deletion of S3 object
-    """
-    flow = (GetFile('/tmp/input') >> PutS3Object() \
-            >> LogAttribute() \
-            >> DeleteS3Object() \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-        assert cluster.check_output(60)
-        assert cluster.is_s3_bucket_empty()
-
-def test_delete_s3_non_existing_object():
-    """
-    Verify deletion of a non-existing S3 object should succeed
-    """
-    flow = (GetFile('/tmp/input')
-            >> DeleteS3Object() \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-        assert cluster.check_output(60)
-
-def test_delete_s3_object_proxy():
-    """
-    Verify deletion of S3 object through proxy server
-    """
-    flow = (GetFile('/tmp/input') >> PutS3Object() \
-            >> LogAttribute() \
-            >> DeleteS3Object(proxy_host='http-proxy',
-                              proxy_port='3128',
-                              proxy_username='admin',
-                              proxy_password='test101') \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-        assert cluster.check_output(60)
-        assert cluster.is_s3_bucket_empty()
-        assert 
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key";)
-
-def test_fetch_s3_object():
-    """
-    Verify fetch of S3 object
-    """
-    put_flow = (GetFile('/tmp/input') >> PutS3Object())
-    fetch_flow = (GenerateFlowFile("1 kB", schedule={'scheduling period': '5 
sec'}) >> FetchS3Object() >> LogAttribute() >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(put_flow, engine='minifi-cpp', 
name='minifi-cpp-put')
-        cluster.deploy_flow(fetch_flow, engine='minifi-cpp', 
name='minifi-cpp-fetch')
-        assert cluster.check_output(60)
-
-def test_fetch_s3_object_proxy():
-    """
-    Verify fetch of S3 object
-    """
-    put_flow = (GetFile('/tmp/input') >> PutS3Object() >> LogAttribute())
-    fetch_flow = (GenerateFlowFile("1 kB", schedule={'scheduling period': '5 
sec'}) \
-                  >> FetchS3Object(proxy_host='http-proxy',
-                                   proxy_port='3128',
-                                   proxy_username='admin',
-                                   proxy_password='test101') \
-                  >> LogAttribute() >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', 
subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(put_flow, engine='minifi-cpp', 
name='minifi-cpp-put')
-        cluster.deploy_flow(fetch_flow, engine='minifi-cpp', 
name='minifi-cpp-fetch')
-        assert cluster.check_output(60)
-        assert 
cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key";)
diff --git a/docker/test/integration/test_zero_file.py 
b/docker/test/integration/test_zero_file.py
deleted file mode 100644
index 1a0cf05..0000000
--- a/docker/test/integration/test_zero_file.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_zero_file():
-    """
-    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
-    """
-
-    port = InputPort('from-minifi', 
RemoteProcessGroup('http://nifi:8080/nifi'))
-
-    recv_flow = (port
-                 >> LogAttribute(schedule={'scheduling strategy': 
'TIMER_DRIVEN'})
-                 >> PutFile('/tmp/output'))
-
-    send_flow = (GenerateFlowFile('0B')
-                 >> LogAttribute()
-                 >> port)
-
-    with DockerTestCluster(EmptyFilesOutPutValidator()) as cluster:
-        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
-        cluster.deploy_flow(send_flow)
-        assert cluster.check_output(120)
diff --git a/docker/test/test_https.py b/docker/test/test_https.py
deleted file mode 100644
index 2ea1bed..0000000
--- a/docker/test/test_https.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-
-from M2Crypto import X509, EVP, RSA, ASN1
-
-from minifi import *
-
-def callback():
-    pass
-
-
-def test_invoke_listen_https_one_way():
-    """
-    Verify sending using InvokeHTTP to a receiver using ListenHTTP (with TLS).
-    """
-
-    cert, key = gen_cert()
-
-    # TODO define SSLContextService class & generate config yml for services
-    crt_file = '/tmp/resources/test-crt.pem'
-
-    invoke_flow = (GetFile('/tmp/input')
-                   >> InvokeHTTP('https://minifi-listen:4430/contentListener',
-                                 method='POST',
-                                 
ssl_context_service=SSLContextService(cert=crt_file, ca_cert=crt_file)))
-
-    listen_flow = (ListenHTTP(4430, cert=crt_file)
-                   >> LogAttribute()
-                   >> PutFile('/tmp/output'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_resource('test-crt.pem', cert.as_pem() + 
key.as_pem(None, callback))
-        cluster.put_test_data('test')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
-
-
-def gen_cert():
-    """
-    Generate TLS certificate request for testing
-    """
-
-    req, key = gen_req()
-    pub_key = req.get_pubkey()
-    subject = req.get_subject()
-    cert = X509.X509()
-    # noinspection PyTypeChecker
-    cert.set_serial_number(1)
-    cert.set_version(2)
-    cert.set_subject(subject)
-    t = int(time.time())
-    now = ASN1.ASN1_UTCTIME()
-    now.set_time(t)
-    now_plus_year = ASN1.ASN1_UTCTIME()
-    now_plus_year.set_time(t + 60 * 60 * 24 * 365)
-    cert.set_not_before(now)
-    cert.set_not_after(now_plus_year)
-    issuer = X509.X509_Name()
-    issuer.C = 'US'
-    issuer.CN = 'minifi-listen'
-    cert.set_issuer(issuer)
-    cert.set_pubkey(pub_key)
-    cert.sign(key, 'sha256')
-
-    return cert, key
-
-
-def gen_req():
-    """
-    Generate TLS certificate request for testing
-    """
-
-    logging.info('Generating test certificate request')
-    key = EVP.PKey()
-    req = X509.Request()
-    rsa = RSA.gen_key(1024, 65537, callback)
-    key.assign_rsa(rsa)
-    req.set_pubkey(key)
-    name = req.get_subject()
-    name.C = 'US'
-    name.CN = 'minifi-listen'
-    req.sign(key, 'sha256')
-
-    return req, key

Reply via email to