This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 34a79a9e319a198169da760e3f77a5797f3f7f2d
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Nov 17 17:13:40 2025 +0100

    MINIFICPP-2680  Move Amazon Kinesis tests to modular docker tests
    
    Closes #2074
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
---
 docker/test/integration/cluster/ContainerStore.py  | 18 ------
 .../test/integration/cluster/DockerTestCluster.py  | 22 --------
 docker/test/integration/cluster/ImageStore.py      |  5 --
 .../integration/cluster/checkers/AwsChecker.py     | 66 ----------------------
 .../cluster/containers/KinesisServerContainer.py   | 41 --------------
 .../cluster/containers/S3ServerContainer.py        | 40 -------------
 .../features/MiNiFi_integration_test_driver.py     | 15 -----
 docker/test/integration/features/steps/steps.py    | 40 -------------
 .../minifi/processors/DeleteS3Object.py            | 42 --------------
 .../integration/minifi/processors/FetchS3Object.py | 41 --------------
 .../test/integration/minifi/processors/ListS3.py   | 41 --------------
 .../minifi/processors/PutKinesisStream.py          | 42 --------------
 .../integration/minifi/processors/PutS3Object.py   | 43 --------------
 .../aws/tests}/features/kinesis.feature            | 22 +++++---
 .../features}/resources/kinesis-mock/Dockerfile    |  0
 .../resources/kinesis-mock/consumer/consumer.js    |  0
 .../resources/kinesis-mock/consumer/package.json   |  0
 .../features}/resources/kinesis-mock/server.json   |  0
 .../features/steps/kinesis_server_container.py     | 50 ++++++++++++++++
 .../tests/features/steps/s3_server_container.py    |  4 --
 extensions/aws/tests/features/steps/steps.py       | 15 ++++-
 21 files changed, 77 insertions(+), 470 deletions(-)

diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index 10b093dee..6d87739bb 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -18,8 +18,6 @@ from .containers.MinifiContainer import MinifiOptions
 from .containers.MinifiContainer import MinifiContainer
 from .containers.NifiContainer import NifiContainer
 from .containers.NifiContainer import NiFiOptions
-from .containers.KinesisServerContainer import KinesisServerContainer
-from .containers.S3ServerContainer import S3ServerContainer
 from .containers.AzureStorageServerContainer import AzureStorageServerContainer
 from .containers.HttpProxyContainer import HttpProxyContainer
 from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer
@@ -115,22 +113,6 @@ class ContainerStore:
                                                                  
network=self.network,
                                                                  
image_store=self.image_store,
                                                                  
command=command))
-        elif engine == 's3-server':
-            return self.containers.setdefault(container_name,
-                                              
S3ServerContainer(feature_context=feature_context,
-                                                                
name=container_name,
-                                                                vols=self.vols,
-                                                                
network=self.network,
-                                                                
image_store=self.image_store,
-                                                                
command=command))
-        elif engine == 'kinesis-server':
-            return self.containers.setdefault(container_name,
-                                              
KinesisServerContainer(feature_context=feature_context,
-                                                                     
name=container_name,
-                                                                     
vols=self.vols,
-                                                                     
network=self.network,
-                                                                     
image_store=self.image_store,
-                                                                     
command=command))
         elif engine == 'azure-storage-server':
             return self.containers.setdefault(container_name,
                                               
AzureStorageServerContainer(feature_context=feature_context,
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index 15714f5ea..93801e83e 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -25,7 +25,6 @@ from .LogSource import LogSource
 from .ContainerStore import ContainerStore
 from .DockerCommunicator import DockerCommunicator
 from .MinifiControllerExecutor import MinifiControllerExecutor
-from .checkers.AwsChecker import AwsChecker
 from .checkers.AzureChecker import AzureChecker
 from .checkers.PostgresChecker import PostgresChecker
 from .checkers.PrometheusChecker import PrometheusChecker
@@ -40,7 +39,6 @@ class DockerTestCluster:
         self.vols = {}
         self.container_communicator = DockerCommunicator()
         self.container_store = 
ContainerStore(self.container_communicator.create_docker_network(feature_id), 
context.image_store, context.kubernetes_proxy, feature_id=feature_id)
-        self.aws_checker = AwsChecker(self.container_communicator)
         self.azure_checker = AzureChecker(self.container_communicator)
         self.postgres_checker = PostgresChecker(self.container_communicator)
         self.prometheus_checker = PrometheusChecker()
@@ -190,26 +188,6 @@ class DockerTestCluster:
                  and output.count("TCP_MISS") >= output.count("TCP_DENIED"))
                  or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output)
 
-    def check_kinesis_server_record_data(self, container_name, record_data):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.aws_checker.check_kinesis_server_record_data(container_name, record_data)
-
-    def check_s3_server_object_data(self, container_name, test_data):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return self.aws_checker.check_s3_server_object_data(container_name, 
test_data)
-
-    def check_s3_server_object_hash(self, container_name: str, 
expected_file_hash: str):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return self.aws_checker.check_s3_server_object_hash(container_name, 
expected_file_hash)
-
-    def check_s3_server_object_metadata(self, container_name, 
content_type="application/octet-stream", metadata=dict()):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.aws_checker.check_s3_server_object_metadata(container_name, content_type, 
metadata)
-
-    def is_s3_bucket_empty(self, container_name):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return self.aws_checker.is_s3_bucket_empty(container_name)
-
     def check_azure_storage_server_data(self, container_name, test_data):
         container_name = 
self.container_store.get_container_name_with_postfix(container_name)
         return 
self.azure_checker.check_azure_storage_server_data(container_name, test_data)
diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 360af8a7c..50025e37a 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -65,8 +65,6 @@ class ImageStore:
             image = self.__build_postgresql_server_image()
         elif container_engine == "mqtt-broker":
             image = self.__build_mqtt_broker_image()
-        elif container_engine == "kinesis-server":
-            image = self.__build_kinesis_image()
         else:
             raise Exception("There is no associated image for " + 
container_engine)
 
@@ -288,9 +286,6 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
-    def __build_kinesis_image(self):
-        return self.__build_image_by_path(self.test_dir + 
"/resources/kinesis-mock", 'kinesis-server')
-
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/cluster/checkers/AwsChecker.py 
b/docker/test/integration/cluster/checkers/AwsChecker.py
deleted file mode 100644
index c9e4ea057..000000000
--- a/docker/test/integration/cluster/checkers/AwsChecker.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import json
-from utils import retry_check
-
-
-class AwsChecker:
-    def __init__(self, container_communicator):
-        self.container_communicator = container_communicator
-
-    @retry_check()
-    def check_kinesis_server_record_data(self, container_name, record_data):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["node", 
"/app/consumer/consumer.js", record_data])
-        return code == 0
-
-    @retry_check()
-    def check_s3_server_object_data(self, container_name, test_data):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["find", 
"/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
-        if code != 0:
-            return False
-        s3_mock_dir = output.strip()
-        (code, file_data) = 
self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir 
+ "/binaryData"])
-        return code == 0 and file_data == test_data
-
-    @retry_check()
-    def check_s3_server_object_hash(self, container_name: str, 
expected_file_hash: str):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["find", 
"/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
-        if code != 0:
-            return False
-        dir_candidates = output.split("\n")
-        for candidate in dir_candidates:
-            if "multiparts" not in candidate:
-                s3_mock_dir = candidate
-                break
-        (code, md5_output) = 
self.container_communicator.execute_command(container_name, ["md5sum", 
s3_mock_dir + "/binaryData"])
-        if code != 0:
-            return False
-        file_hash = md5_output.split(' ')[0].strip()
-        return file_hash == expected_file_hash
-
-    @retry_check()
-    def check_s3_server_object_metadata(self, container_name, 
content_type="application/octet-stream", metadata=dict()):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["find", 
"/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
-        if code != 0:
-            return False
-        s3_mock_dir = output.strip()
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir 
+ "/objectMetadata.json"])
-        server_metadata = json.loads(output)
-        return code == 0 and server_metadata["contentType"] == content_type 
and metadata == server_metadata["userMetadata"]
-
-    @retry_check()
-    def is_s3_bucket_empty(self, container_name):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["find", 
"/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
-        return code == 0 and not output.strip()
diff --git 
a/docker/test/integration/cluster/containers/KinesisServerContainer.py 
b/docker/test/integration/cluster/containers/KinesisServerContainer.py
deleted file mode 100644
index 0bee46cef..000000000
--- a/docker/test/integration/cluster/containers/KinesisServerContainer.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-from .Container import Container
-
-
-class KinesisServerContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command):
-        super().__init__(feature_context, name, 'kinesis-server', vols, 
network, image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "Starting Kinesis Plain Mock Service on port 4568"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running kinesis server docker container...')
-        self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            environment=["INITIALIZE_STREAMS=test_stream:3",
-                         "LOG_LEVEL=DEBUG"],
-            entrypoint=self.command)
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/cluster/containers/S3ServerContainer.py 
b/docker/test/integration/cluster/containers/S3ServerContainer.py
deleted file mode 100644
index 1b4695759..000000000
--- a/docker/test/integration/cluster/containers/S3ServerContainer.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-from .Container import Container
-
-
-class S3ServerContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command):
-        super().__init__(feature_context, name, 's3-server', vols, network, 
image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "Started S3MockApplication"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running s3 server docker container...')
-        self.client.containers.run(
-            "adobe/s3mock:3.12.0",
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            environment=["initialBuckets=test_bucket"],
-            entrypoint=self.command)
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 30c3d7f72..d3dd8dd5c 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -275,21 +275,6 @@ class MiNiFi_integration_test:
         assert not self.cluster.segfault_happened() or 
self.cluster.log_app_output()
         assert validator.validate() or self.cluster.log_app_output()
 
-    def check_kinesis_server_record_data(self, kinesis_container_name, 
record_data):
-        assert 
self.cluster.check_kinesis_server_record_data(kinesis_container_name, 
record_data) or self.cluster.log_app_output()
-
-    def check_s3_server_object_data(self, s3_container_name, object_data):
-        assert self.cluster.check_s3_server_object_data(s3_container_name, 
object_data) or self.cluster.log_app_output()
-
-    def check_s3_server_large_object_data(self, s3_container_name: str):
-        assert self.cluster.check_s3_server_object_hash(s3_container_name, 
self.test_file_hash) or self.cluster.log_app_output()
-
-    def check_s3_server_object_metadata(self, s3_container_name, content_type):
-        assert self.cluster.check_s3_server_object_metadata(s3_container_name, 
content_type) or self.cluster.log_app_output()
-
-    def check_empty_s3_bucket(self, s3_container_name):
-        assert self.cluster.is_s3_bucket_empty(s3_container_name) or 
self.cluster.log_app_output()
-
     def check_http_proxy_access(self, http_proxy_container_name, url):
         assert self.cluster.check_http_proxy_access(http_proxy_container_name, 
url) or self.cluster.log_app_output()
 
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 1150a569a..9757441dd 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -120,11 +120,8 @@ def step_impl(context, processor_type, 
minifi_container_name):
 
 
 @given("a {processor_type} processor")
-@given("a {processor_type} processor set up to communicate with an s3 server")
-@given("a {processor_type} processor set up to communicate with the same s3 
server")
 @given("a {processor_type} processor set up to communicate with an Azure blob 
storage")
 @given("a {processor_type} processor set up to communicate with an MQTT broker 
instance")
-@given("a {processor_type} processor set up to communicate with the kinesis 
server")
 def step_impl(context, processor_type):
     __create_processor(context, processor_type, processor_type, None, None, 
"minifi-cpp-flow")
 
@@ -499,18 +496,6 @@ def step_impl(context):
     context.test.start('mqtt-broker')
 
 
-# s3 setup
-@given("a s3 server is set up in correspondence with the PutS3Object")
-@given("a s3 server is set up in correspondence with the DeleteS3Object")
-def step_impl(context):
-    context.test.acquire_container(context=context, name="s3-server", 
engine="s3-server")
-
-
-@given("a kinesis server is set up in correspondence with the 
PutKinesisStream")
-def step_impl(context):
-    context.test.acquire_container(context=context, name="kinesis-server", 
engine="kinesis-server")
-
-
 # azure storage setup
 @given("an Azure storage server is set up")
 def step_impl(context):
@@ -730,31 +715,6 @@ def step_impl(context, url):
     context.test.check_http_proxy_access('http-proxy', url)
 
 
-@then("there is a record on the kinesis server with \"{record_data}\"")
-def step_impl(context, record_data):
-    context.test.check_kinesis_server_record_data("kinesis-server", 
record_data)
-
-
-@then("the object on the s3 server is \"{object_data}\"")
-def step_impl(context, object_data):
-    context.test.check_s3_server_object_data("s3-server", object_data)
-
-
-@then("the object on the s3 server is present and matches the original hash")
-def step_impl(context):
-    context.test.check_s3_server_large_object_data("s3-server")
-
-
-@then("the object content type on the s3 server is \"{content_type}\" and the 
object metadata matches use metadata")
-def step_impl(context, content_type):
-    context.test.check_s3_server_object_metadata("s3-server", content_type)
-
-
-@then("the object bucket on the s3 server is empty")
-def step_impl(context):
-    context.test.check_empty_s3_bucket("s3-server")
-
-
 # Azure
 @when("test blob \"{blob_name}\" with the content \"{content}\" is created on 
Azure blob storage")
 def step_impl(context, blob_name, content):
diff --git a/docker/test/integration/minifi/processors/DeleteS3Object.py 
b/docker/test/integration/minifi/processors/DeleteS3Object.py
deleted file mode 100644
index 0f56c02cf..000000000
--- a/docker/test/integration/minifi/processors/DeleteS3Object.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class DeleteS3Object(Processor):
-    def __init__(
-            self,
-            context,
-            proxy_host='',
-            proxy_port='',
-            proxy_username='',
-            proxy_password=''):
-        super(DeleteS3Object, self).__init__(
-            context=context,
-            clazz='DeleteS3Object',
-            properties={
-                'Object Key': 'test_object_key',
-                'Bucket': 'test_bucket',
-                'Access Key': 'test_access_key',
-                'Secret Key': 'test_secret',
-                'Endpoint Override URL': 
f"http://s3-server-{context.feature_id}:9090";,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'Proxy Username': proxy_username,
-                'Proxy Password': proxy_password,
-            },
-            auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/FetchS3Object.py 
b/docker/test/integration/minifi/processors/FetchS3Object.py
deleted file mode 100644
index 0c60096da..000000000
--- a/docker/test/integration/minifi/processors/FetchS3Object.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class FetchS3Object(Processor):
-    def __init__(self,
-                 context,
-                 proxy_host='',
-                 proxy_port='',
-                 proxy_username='',
-                 proxy_password=''):
-        super(FetchS3Object, self).__init__(
-            context=context,
-            clazz='FetchS3Object',
-            properties={
-                'Object Key': 'test_object_key',
-                'Bucket': 'test_bucket',
-                'Access Key': 'test_access_key',
-                'Secret Key': 'test_secret',
-                'Endpoint Override URL': 
f"http://s3-server-{context.feature_id}:9090";,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'Proxy Username': proxy_username,
-                'Proxy Password': proxy_password,
-            },
-            auto_terminate=['success', 'failure'])
diff --git a/docker/test/integration/minifi/processors/ListS3.py 
b/docker/test/integration/minifi/processors/ListS3.py
deleted file mode 100644
index 9f54064a2..000000000
--- a/docker/test/integration/minifi/processors/ListS3.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class ListS3(Processor):
-    def __init__(self,
-                 context,
-                 proxy_host='',
-                 proxy_port='',
-                 proxy_username='',
-                 proxy_password=''):
-        super(ListS3, self).__init__(
-            context=context,
-            clazz='ListS3',
-            properties={
-                'Bucket': 'test_bucket',
-                'Access Key': 'test_access_key',
-                'Secret Key': 'test_secret',
-                'Endpoint Override URL': 
f"http://s3-server-{context.feature_id}:9090";,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'Proxy Username': proxy_username,
-                'Proxy Password': proxy_password,
-            },
-            schedule={'scheduling period': '2 sec'},
-            auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/PutKinesisStream.py 
b/docker/test/integration/minifi/processors/PutKinesisStream.py
deleted file mode 100644
index 40e55e1c2..000000000
--- a/docker/test/integration/minifi/processors/PutKinesisStream.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class PutKinesisStream(Processor):
-    def __init__(
-            self,
-            context,
-            proxy_host='',
-            proxy_port='',
-            proxy_username='',
-            proxy_password=''):
-        super(PutKinesisStream, self).__init__(
-            context=context,
-            clazz='PutKinesisStream',
-            properties={
-                'Amazon Kinesis Stream Name': 'test_stream',
-                'Access Key': 'test_access_key',
-                'Secret Key': 'test_secret',
-                'Endpoint Override URL': 
f"http://kinesis-server-{context.feature_id}:4568";,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'Proxy Username': proxy_username,
-                'Proxy Password': proxy_password,
-                'Region': 'us-east-1'
-            },
-            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py 
b/docker/test/integration/minifi/processors/PutS3Object.py
deleted file mode 100644
index ac33cb43a..000000000
--- a/docker/test/integration/minifi/processors/PutS3Object.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class PutS3Object(Processor):
-    def __init__(
-            self,
-            context,
-            object_key='test_object_key',
-            proxy_host='',
-            proxy_port='',
-            proxy_username='',
-            proxy_password=''):
-        super(PutS3Object, self).__init__(
-            context=context,
-            clazz='PutS3Object',
-            properties={
-                'Object Key': object_key,
-                'Bucket': 'test_bucket',
-                'Access Key': 'test_access_key',
-                'Secret Key': 'test_secret',
-                'Endpoint Override URL': 
f"http://s3-server-{context.feature_id}:9090";,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'Proxy Username': proxy_username,
-                'Proxy Password': proxy_password
-            },
-            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/features/kinesis.feature 
b/extensions/aws/tests/features/kinesis.feature
similarity index 59%
rename from docker/test/integration/features/kinesis.feature
rename to extensions/aws/tests/features/kinesis.feature
index 95f5464fb..6f903cd56 100644
--- a/docker/test/integration/features/kinesis.feature
+++ b/extensions/aws/tests/features/kinesis.feature
@@ -19,21 +19,27 @@ Feature: Sending data from MiNiFi-C++ to an AWS Kinesis 
server
   As a user of MiNiFi
   I need to have PutKinesisStream processor
 
-  Background:
-    Given the content of "/tmp/output" is monitored
-
   Scenario: A MiNiFi instance can send data to AWS Kinesis
-    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    Given a kinesis server is set up in correspondence with the 
PutKinesisStream
+    And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "Schnappi, das kleine Krokodil" is present in 
"/tmp/input"
-    And a PutKinesisStream processor set up to communicate with the kinesis 
server
+    And a PutKinesisStream processor
+    And these processor properties are set
+      | processor name     | property name              | property value       
                    |
+      | PutKinesisStream   | Amazon Kinesis Stream Name | test_stream          
                    |
+      | PutKinesisStream   | Access Key                 | test_access_key      
                    |
+      | PutKinesisStream   | Secret Key                 | test_secret          
                    |
+      | PutKinesisStream   | Endpoint Override URL      | 
http://kinesis-server-${scenario_id}:4568 |
+      | PutKinesisStream   | Region                     | us-east-1            
                    |
+    And PutKinesisStream is EVENT_DRIVEN
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PutKinesisStream
     And the "success" relationship of the PutKinesisStream processor is 
connected to the PutFile
     And the "failure" relationship of the PutKinesisStream processor is 
connected to the PutKinesisStream
-
-    And a kinesis server is set up in correspondence with the PutKinesisStream
+    And PutFile's success relationship is auto-terminated
 
     When both instances start up
 
-    Then a flowfile with the content "Schnappi, das kleine Krokodil" is placed 
in the monitored directory in less than 60 seconds
+    Then a single file with the content "Schnappi, das kleine Krokodil" is 
placed in the "/tmp/output" directory in less than 60 seconds
     And there is a record on the kinesis server with "Schnappi, das kleine 
Krokodil"
diff --git a/docker/test/integration/resources/kinesis-mock/Dockerfile 
b/extensions/aws/tests/features/resources/kinesis-mock/Dockerfile
similarity index 100%
rename from docker/test/integration/resources/kinesis-mock/Dockerfile
rename to extensions/aws/tests/features/resources/kinesis-mock/Dockerfile
diff --git 
a/docker/test/integration/resources/kinesis-mock/consumer/consumer.js 
b/extensions/aws/tests/features/resources/kinesis-mock/consumer/consumer.js
similarity index 100%
rename from docker/test/integration/resources/kinesis-mock/consumer/consumer.js
rename to 
extensions/aws/tests/features/resources/kinesis-mock/consumer/consumer.js
diff --git 
a/docker/test/integration/resources/kinesis-mock/consumer/package.json 
b/extensions/aws/tests/features/resources/kinesis-mock/consumer/package.json
similarity index 100%
rename from docker/test/integration/resources/kinesis-mock/consumer/package.json
rename to 
extensions/aws/tests/features/resources/kinesis-mock/consumer/package.json
diff --git a/docker/test/integration/resources/kinesis-mock/server.json 
b/extensions/aws/tests/features/resources/kinesis-mock/server.json
similarity index 100%
rename from docker/test/integration/resources/kinesis-mock/server.json
rename to extensions/aws/tests/features/resources/kinesis-mock/server.json
diff --git a/extensions/aws/tests/features/steps/kinesis_server_container.py 
b/extensions/aws/tests/features/steps/kinesis_server_container.py
new file mode 100644
index 000000000..e92b33c41
--- /dev/null
+++ b/extensions/aws/tests/features/steps/kinesis_server_container.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from pathlib import Path
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.core.helpers import wait_for_condition, retry_check
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from minifi_test_framework.containers.docker_image_builder import 
DockerImageBuilder
+
+
+class KinesisServerContainer(Container):
+    def __init__(self, test_context: MinifiTestContext):
+        builder = DockerImageBuilder(
+            image_tag="minifi-kinesis-mock:latest",
+            build_context_path=str(Path(__file__).resolve().parent.parent / 
"resources" / "kinesis-mock")
+        )
+        builder.build()
+
+        super().__init__("minifi-kinesis-mock:latest", 
f"kinesis-server-{test_context.scenario_id}", test_context.network)
+        self.environment.append("INITIALIZE_STREAMS=test_stream:3")
+        self.environment.append("LOG_LEVEL=DEBUG")
+
+    def deploy(self):
+        super().deploy()
+        finished_str = "Starting Kinesis Plain Mock Service on port 4568"
+        return wait_for_condition(
+            condition=lambda: finished_str in self.get_logs(),
+            timeout_seconds=300,
+            bail_condition=lambda: self.exited,
+            context=None)
+
+    @retry_check()
+    def check_kinesis_server_record_data(self, record_data):
+        (code, output) = self.exec_run(["node", "/app/consumer/consumer.js", 
record_data])
+        logging.info(f"Kinesis server returned output: '{output}' with code 
'{code}'")
+        return code == 0
diff --git a/extensions/aws/tests/features/steps/s3_server_container.py 
b/extensions/aws/tests/features/steps/s3_server_container.py
index bd1985f93..01db20030 100644
--- a/extensions/aws/tests/features/steps/s3_server_container.py
+++ b/extensions/aws/tests/features/steps/s3_server_container.py
@@ -35,10 +35,6 @@ class S3ServerContainer(Container):
             bail_condition=lambda: self.exited,
             context=None)
 
-    def check_kinesis_server_record_data(self, container_name, record_data):
-        (code, output) = self.exec_run(["node", "/app/consumer/consumer.js", 
record_data])
-        return code == 0
-
     def check_s3_server_object_data(self, test_data):
         (code, output) = self.exec_run(["find", "/s3mockroot/test_bucket", 
"-mindepth", "1", "-maxdepth", "1", "-type", "d"])
         if code != 0:
diff --git a/extensions/aws/tests/features/steps/steps.py 
b/extensions/aws/tests/features/steps/steps.py
index 6f8d1d757..682685582 100644
--- a/extensions/aws/tests/features/steps/steps.py
+++ b/extensions/aws/tests/features/steps/steps.py
@@ -18,7 +18,7 @@ import random
 import string
 
 import humanfriendly
-from behave import step
+from behave import step, then
 
 from minifi_test_framework.containers.directory import Directory
 from minifi_test_framework.steps import checking_steps        # noqa: F401
@@ -27,9 +27,10 @@ from minifi_test_framework.steps import core_steps           
 # noqa: F401
 from minifi_test_framework.steps import flow_building_steps   # noqa: F401
 from minifi_test_framework.core.minifi_test_context import MinifiTestContext
 from minifi_test_framework.minifi.processor import Processor
-from minifi_test_framework.core.helpers import wait_for_condition
+from minifi_test_framework.core.helpers import wait_for_condition, 
log_due_to_failure
 
 from s3_server_container import S3ServerContainer
+from kinesis_server_container import KinesisServerContainer
 
 
 @step('a {processor_name} processor set up to communicate with an s3 server')
@@ -100,3 +101,13 @@ def step_impl(context):
     new_dir.files["input.txt"] = content
     context.get_or_create_default_minifi_container().dirs.append(new_dir)
     context.original_hash = computeMD5hash(content)
+
+
+@step("a kinesis server is set up in correspondence with the PutKinesisStream")
+def step_impl(context):
+    context.containers["kinesis-server"] = KinesisServerContainer(context)
+
+
+@then("there is a record on the kinesis server with \"{record_data}\"")
+def step_impl(context, record_data):
+    assert 
context.containers["kinesis-server"].check_kinesis_server_record_data(record_data)
 or log_due_to_failure(context)


Reply via email to