This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch MINIFICPP-2675
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 53ade0b94f41679d5b53cd4e15bb42155470e54e
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Nov 11 17:12:48 2025 +0100

    MINIFICPP-2675 Move Elasticsearch tests to modular docker tests
---
 docker/RunBehaveTests.sh                           |   3 +-
 docker/test/integration/cluster/ContainerStore.py  |  18 ----
 .../test/integration/cluster/DockerTestCluster.py  |  22 -----
 .../cluster/checkers/ElasticSearchChecker.py       |  50 ----------
 .../cluster/containers/ElasticsearchContainer.py   | 101 ---------------------
 .../cluster/containers/OpensearchContainer.py      |  80 ----------------
 .../features/MiNiFi_integration_test_driver.py     |  25 -----
 docker/test/integration/features/steps/steps.py    |  71 ---------------
 .../minifi/processors/PostElasticsearch.py         |  31 -------
 .../tests}/features/elasticsearch.feature          |  42 ++++++---
 .../elasticsearch/tests/features/environment.py    |  17 ++--
 .../tests}/features/opensearch.feature             |  45 +++++----
 .../tests/features/resources}/elasticsearch.yml    |   0
 .../tests/features/resources}/opensearch.yml       |   0
 .../tests/features/steps/elastic_base_container.py |  47 ++++++++++
 .../features/steps/elasticsearch_container.py      |  65 +++++++++++++
 .../tests/features/steps/opensearch_container.py   |  51 +++++++++++
 .../elasticsearch/tests/features/steps/steps.py    |  79 ++++++++++++++++
 18 files changed, 305 insertions(+), 442 deletions(-)

diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh
index 05297ec98..a93918b8a 100755
--- a/docker/RunBehaveTests.sh
+++ b/docker/RunBehaveTests.sh
@@ -200,4 +200,5 @@ exec \
     "${docker_dir}/../extensions/sql/tests/features" \
     "${docker_dir}/../extensions/opc/tests/features" \
     "${docker_dir}/../extensions/kafka/tests/features" \
-    "${docker_dir}/../extensions/couchbase/tests/features"
+    "${docker_dir}/../extensions/couchbase/tests/features" \
+    "${docker_dir}/../extensions/elasticsearch/tests/features"
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index f225e1864..7c1c0d6c0 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -26,8 +26,6 @@ from .containers.HttpProxyContainer import HttpProxyContainer
 from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer
 from .containers.MqttBrokerContainer import MqttBrokerContainer
 from .containers.SplunkContainer import SplunkContainer
-from .containers.ElasticsearchContainer import ElasticsearchContainer
-from .containers.OpensearchContainer import OpensearchContainer
 from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer
 from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer
 from .containers.MinifiAsPodInKubernetesCluster import 
MinifiAsPodInKubernetesCluster
@@ -179,22 +177,6 @@ class ContainerStore:
                                                               
network=self.network,
                                                               
image_store=self.image_store,
                                                               command=command))
-        elif engine == 'elasticsearch':
-            return self.containers.setdefault(container_name,
-                                              
ElasticsearchContainer(feature_context=feature_context,
-                                                                     
name=container_name,
-                                                                     
vols=self.vols,
-                                                                     
network=self.network,
-                                                                     
image_store=self.image_store,
-                                                                     
command=command))
-        elif engine == 'opensearch':
-            return self.containers.setdefault(container_name,
-                                              
OpensearchContainer(feature_context=feature_context,
-                                                                  
name=container_name,
-                                                                  
vols=self.vols,
-                                                                  
network=self.network,
-                                                                  
image_store=self.image_store,
-                                                                  
command=command))
         elif engine == "syslog-udp-client":
             return self.containers.setdefault(container_name,
                                               SyslogUdpClientContainer(
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index 6d8df1b7e..d74f4aa37 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -28,7 +28,6 @@ from .DockerCommunicator import DockerCommunicator
 from .MinifiControllerExecutor import MinifiControllerExecutor
 from .checkers.AwsChecker import AwsChecker
 from .checkers.AzureChecker import AzureChecker
-from .checkers.ElasticSearchChecker import ElasticSearchChecker
 from .checkers.GcsChecker import GcsChecker
 from .checkers.PostgresChecker import PostgresChecker
 from .checkers.PrometheusChecker import PrometheusChecker
@@ -46,7 +45,6 @@ class DockerTestCluster:
         self.container_store = 
ContainerStore(self.container_communicator.create_docker_network(feature_id), 
context.image_store, context.kubernetes_proxy, feature_id=feature_id)
         self.aws_checker = AwsChecker(self.container_communicator)
         self.azure_checker = AzureChecker(self.container_communicator)
-        self.elastic_search_checker = 
ElasticSearchChecker(self.container_communicator)
         self.gcs_checker = GcsChecker(self.container_communicator)
         self.postgres_checker = PostgresChecker(self.container_communicator)
         self.splunk_checker = SplunkChecker(self.container_communicator)
@@ -260,26 +258,6 @@ class DockerTestCluster:
         container_name = 
self.container_store.get_container_name_with_postfix(container_name)
         return self.gcs_checker.is_gcs_bucket_empty(container_name)
 
-    def is_elasticsearch_empty(self, container_name):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.elastic_search_checker.is_elasticsearch_empty(container_name)
-
-    def create_doc_elasticsearch(self, container_name, index_name, doc_id):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.elastic_search_checker.create_doc_elasticsearch(container_name, 
index_name, doc_id)
-
-    def check_elastic_field_value(self, container_name, index_name, doc_id, 
field_name, field_value):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.elastic_search_checker.check_elastic_field_value(container_name, 
index_name, doc_id, field_name, field_value)
-
-    def elastic_generate_apikey(self, elastic_container_name):
-        elastic_container_name = 
self.container_store.get_container_name_with_postfix(elastic_container_name)
-        return 
self.elastic_search_checker.elastic_generate_apikey(elastic_container_name)
-
-    def add_elastic_user_to_opensearch(self, container_name):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.elastic_search_checker.add_elastic_user_to_opensearch(container_name)
-
     def check_query_results(self, postgresql_container_name, query, 
number_of_rows, timeout_seconds):
         postgresql_container_name = 
self.container_store.get_container_name_with_postfix(postgresql_container_name)
         return 
self.postgres_checker.check_query_results(postgresql_container_name, query, 
number_of_rows, timeout_seconds)
diff --git a/docker/test/integration/cluster/checkers/ElasticSearchChecker.py 
b/docker/test/integration/cluster/checkers/ElasticSearchChecker.py
deleted file mode 100644
index b5c4e5983..000000000
--- a/docker/test/integration/cluster/checkers/ElasticSearchChecker.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import json
-from utils import retry_check
-
-
-class ElasticSearchChecker:
-    def __init__(self, container_communicator):
-        self.container_communicator = container_communicator
-
-    @retry_check()
-    def is_elasticsearch_empty(self, container_name):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["curl", "-s", 
"-u", "elastic:password", "-k", "-XGET", "https://localhost:9200/_search";])
-        return code == 0 and '"hits":[]' in output
-
-    def create_doc_elasticsearch(self, container_name, index_name, doc_id):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c",
-                                                                               
       "curl -s -u elastic:password -k -XPUT https://localhost:9200/"; + 
index_name + "/_doc/" + doc_id + " -H Content-Type:application/json 
-d'{\"field1\":\"value1\"}'"])
-        return code == 0 and ('"_id":"' + doc_id + '"') in output
-
-    def check_elastic_field_value(self, container_name, index_name, doc_id, 
field_name, field_value):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c",
-                                                                               
       "curl -s -u elastic:password -k -XGET https://localhost:9200/"; + 
index_name + "/_doc/" + doc_id])
-        return code == 0 and (field_name + '":"' + field_value) in output
-
-    def elastic_generate_apikey(self, elastic_container_name):
-        (code, output) = 
self.container_communicator.execute_command(elastic_container_name, 
["/bin/bash", "-c",
-                                                                               
               "curl -s -u elastic:password -k -XPOST 
https://localhost:9200/_security/api_key -H Content-Type:application/json 
-d'{\"name\":\"my-api-key\",\"expiration\":\"1d\",\"role_descriptors\":{\"role-a\":
 {\"cluster\": [\"all\"],\"index\": [{\"names\": [\"my_index\"],\"privileges\": 
[\"all\"]}]}}}'"])
-        if code != 0:
-            return None
-        output_lines = output.splitlines()
-        result = json.loads(output_lines[-1])
-        return result["encoded"]
-
-    def add_elastic_user_to_opensearch(self, container_name):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["/bin/bash", "-c",
-                                                                               
       'curl -s -u admin:admin -k -XPUT 
https://{hostname}:9200/_plugins/_security/api/internalusers/elastic -H 
Content-Type:application/json 
-d\'{{"password":"password","backend_roles":["admin"]}}\''.format(hostname=container_name)])
-        return code == 0 and '"status":"CREATED"' in output
diff --git 
a/docker/test/integration/cluster/containers/ElasticsearchContainer.py 
b/docker/test/integration/cluster/containers/ElasticsearchContainer.py
deleted file mode 100644
index 4bab4cbd1..000000000
--- a/docker/test/integration/cluster/containers/ElasticsearchContainer.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-import tempfile
-import os
-import OpenSSL.crypto
-import docker.types
-
-from .Container import Container
-from ssl_utils.SSL_cert_utils import make_server_cert, 
make_cert_without_extended_usage
-
-
-class ElasticsearchContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command=None):
-        super().__init__(feature_context, name, 'elasticsearch', vols, 
network, image_store, command)
-        http_cert, http_key = 
make_server_cert(f"elasticsearch-{feature_context.id}", 
feature_context.root_ca_cert, feature_context.root_ca_key)
-        transport_cert, transport_key = 
make_cert_without_extended_usage("127.0.0.1", feature_context.root_ca_cert, 
feature_context.root_ca_key)
-
-        self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=feature_context.root_ca_cert))
-        self.root_ca_file.close()
-        os.chmod(self.root_ca_file.name, 0o644)
-
-        self.http_cert_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.http_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=http_cert))
-        self.http_cert_file.close()
-        os.chmod(self.http_cert_file.name, 0o644)
-
-        self.http_key_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.http_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
 pkey=http_key))
-        self.http_key_file.close()
-        os.chmod(self.http_key_file.name, 0o644)
-
-        self.transport_cert_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.transport_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=transport_cert))
-        self.transport_cert_file.close()
-        os.chmod(self.transport_cert_file.name, 0o644)
-
-        self.transport_key_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.transport_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
 pkey=transport_key))
-        self.transport_key_file.close()
-        os.chmod(self.transport_key_file.name, 0o644)
-
-    def get_startup_finished_log_entry(self):
-        return '"current.health":"GREEN"'
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        mounts = [
-            docker.types.Mount(
-                type='bind',
-                source=os.environ['TEST_DIRECTORY'] + 
"/resources/elasticsearch/elasticsearch.yml",
-                target='/usr/share/elasticsearch/config/elasticsearch.yml'),
-            docker.types.Mount(
-                type='bind',
-                source=self.http_key_file.name,
-                
target='/usr/share/elasticsearch/config/certs/elastic_http.key'),
-            docker.types.Mount(
-                type='bind',
-                source=self.http_cert_file.name,
-                
target='/usr/share/elasticsearch/config/certs/elastic_http.crt'),
-            docker.types.Mount(
-                type='bind',
-                source=self.transport_key_file.name,
-                
target='/usr/share/elasticsearch/config/certs/elastic_transport.key'),
-            docker.types.Mount(
-                type='bind',
-                source=self.transport_cert_file.name,
-                
target='/usr/share/elasticsearch/config/certs/elastic_transport.crt'),
-            docker.types.Mount(
-                type='bind',
-                source=self.root_ca_file.name,
-                target='/usr/share/elasticsearch/config/certs/root_ca.crt')]
-
-        logging.info('Creating and running Elasticsearch docker container...')
-        self.client.containers.run(
-            image="elasticsearch:9.1.5",
-            detach=True,
-            name=self.name,
-            environment=[
-                "ELASTIC_PASSWORD=password",
-            ],
-            network=self.network.name,
-            mounts=mounts)
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/cluster/containers/OpensearchContainer.py 
b/docker/test/integration/cluster/containers/OpensearchContainer.py
deleted file mode 100644
index 06997a20a..000000000
--- a/docker/test/integration/cluster/containers/OpensearchContainer.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-import os
-import tempfile
-
-import OpenSSL.crypto
-import docker.types
-
-from .Container import Container
-from ssl_utils.SSL_cert_utils import make_server_cert
-
-
-class OpensearchContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command=None):
-        super().__init__(feature_context, name, 'opensearch', vols, network, 
image_store, command)
-        cert, key = make_server_cert(f"opensearch-{feature_context.id}", 
feature_context.root_ca_cert, feature_context.root_ca_key)
-
-        self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=feature_context.root_ca_cert))
-        self.root_ca_file.close()
-        os.chmod(self.root_ca_file.name, 0o644)
-
-        self.admin_pem = tempfile.NamedTemporaryFile(delete=False)
-        
self.admin_pem.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=cert))
-        self.admin_pem.close()
-        os.chmod(self.admin_pem.name, 0o644)
-
-        self.admin_key = tempfile.NamedTemporaryFile(delete=False)
-        
self.admin_key.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
 pkey=key))
-        self.admin_key.close()
-        os.chmod(self.admin_key.name, 0o644)
-
-    def get_startup_finished_log_entry(self):
-        return 'Hot-reloading of audit configuration is enabled'
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-        mounts = [
-            docker.types.Mount(
-                type='bind',
-                source=os.environ['TEST_DIRECTORY'] + 
"/resources/opensearch/opensearch.yml",
-                target='/usr/share/opensearch/config/opensearch.yml'),
-            docker.types.Mount(
-                type='bind',
-                source=self.admin_pem.name,
-                target='/usr/share/opensearch/config/admin.pem'),
-            docker.types.Mount(
-                type='bind',
-                source=self.admin_key.name,
-                target='/usr/share/opensearch/config/admin-key.pem'),
-            docker.types.Mount(
-                type='bind',
-                source=self.root_ca_file.name,
-                target='/usr/share/opensearch/config/root-ca.pem')]
-
-        logging.info('Creating and running Opensearch docker container...')
-        self.client.containers.run(
-            image='opensearchproject/opensearch:2.6.0',
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            mounts=mounts)
-
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index d3d42936d..76fd3f9bb 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -70,16 +70,6 @@ class MiNiFi_integration_test:
         assert self.cluster.wait_for_container_startup_to_finish('splunk') or 
self.cluster.log_app_output()
         assert self.cluster.enable_splunk_hec_indexer('splunk', 
'splunk_hec_token') or self.cluster.log_app_output()
 
-    def start_elasticsearch(self, context):
-        self.cluster.acquire_container(context=context, name='elasticsearch', 
engine='elasticsearch')
-        self.cluster.deploy_container('elasticsearch')
-        assert 
self.cluster.wait_for_container_startup_to_finish('elasticsearch') or 
self.cluster.log_app_output()
-
-    def start_opensearch(self, context):
-        self.cluster.acquire_container(context=context, name='opensearch', 
engine='opensearch')
-        self.cluster.deploy_container('opensearch')
-        assert self.cluster.wait_for_container_startup_to_finish('opensearch') 
or self.cluster.log_app_output()
-
     def start_minifi_c2_server(self, context):
         self.cluster.acquire_container(context=context, 
name="minifi-c2-server", engine="minifi-c2-server")
         self.cluster.deploy_container('minifi-c2-server')
@@ -325,21 +315,6 @@ class MiNiFi_integration_test:
     def check_empty_gcs_bucket(self, gcs_container_name):
         assert self.cluster.is_gcs_bucket_empty(gcs_container_name) or 
self.cluster.log_app_output()
 
-    def check_empty_elastic(self, elastic_container_name):
-        assert self.cluster.is_elasticsearch_empty(elastic_container_name) or 
self.cluster.log_app_output()
-
-    def elastic_generate_apikey(self, elastic_container_name):
-        return self.cluster.elastic_generate_apikey(elastic_container_name) or 
self.cluster.log_app_output()
-
-    def create_doc_elasticsearch(self, elastic_container_name, index_name, 
doc_id):
-        assert self.cluster.create_doc_elasticsearch(elastic_container_name, 
index_name, doc_id) or self.cluster.log_app_output()
-
-    def check_elastic_field_value(self, elastic_container_name, index_name, 
doc_id, field_name, field_value):
-        assert self.cluster.check_elastic_field_value(elastic_container_name, 
index_name, doc_id, field_name, field_value) or self.cluster.log_app_output()
-
-    def add_elastic_user_to_opensearch(self, container_name):
-        assert self.cluster.add_elastic_user_to_opensearch(container_name) or 
self.cluster.log_app_output()
-
     def check_minifi_log_contents(self, line, timeout_seconds=60, count=1):
         self.check_container_log_contents("minifi-cpp", line, timeout_seconds, 
count)
 
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 7074c3257..e1bba09f4 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -20,7 +20,6 @@ from minifi.core.Funnel import Funnel
 
 from minifi.controllers.SSLContextService import SSLContextService
 from minifi.controllers.GCPCredentialsControllerService import 
GCPCredentialsControllerService
-from minifi.controllers.ElasticsearchCredentialsService import 
ElasticsearchCredentialsService
 from minifi.controllers.ODBCService import ODBCService
 from minifi.controllers.KubernetesControllerService import 
KubernetesControllerService
 from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
@@ -512,25 +511,6 @@ def step_impl(context):
     context.test.acquire_container(context=context, name="fake-gcs-server", 
engine="fake-gcs-server")
 
 
-# elasticsearch
-@given('an Elasticsearch server is set up and running')
-@given('an Elasticsearch server is set up and a single document is present 
with "preloaded_id" in "my_index"')
-@given('an Elasticsearch server is set up and a single document is present 
with "preloaded_id" in "my_index" with "value1" in "field1"')
-def step_impl(context):
-    context.test.start_elasticsearch(context)
-    
context.test.create_doc_elasticsearch(context.test.get_container_name_with_postfix("elasticsearch"),
 "my_index", "preloaded_id")
-
-
-# opensearch
-@given('an Opensearch server is set up and running')
-@given('an Opensearch server is set up and a single document is present with 
"preloaded_id" in "my_index"')
-@given('an Opensearch server is set up and a single document is present with 
"preloaded_id" in "my_index" with "value1" in "field1"')
-def step_impl(context):
-    context.test.start_opensearch(context)
-    
context.test.add_elastic_user_to_opensearch(context.test.get_container_name_with_postfix("opensearch"))
-    
context.test.create_doc_elasticsearch(context.test.get_container_name_with_postfix("opensearch"),
 "my_index", "preloaded_id")
-
-
 def setUpSslContextServiceForProcessor(context, processor_name: str):
     minifi_crt_file = '/tmp/resources/minifi_client.crt'
     minifi_key_file = '/tmp/resources/minifi_client.key'
@@ -552,37 +532,6 @@ def setUpSslContextServiceForRPG(context, rpg_name: str):
     rpg.add_property("SSL Context Service", ssl_context_service.name)
 
 
-@given(u'a SSL context service is set up for PostElasticsearch and 
Elasticsearch')
-def step_impl(context):
-    setUpSslContextServiceForProcessor(context, "PostElasticsearch")
-
-
-@given(u'a SSL context service is set up for PostElasticsearch and Opensearch')
-def step_impl(context):
-    root_ca_crt_file = '/tmp/resources/root_ca.crt'
-    ssl_context_service = SSLContextService(ca_cert=root_ca_crt_file)
-    post_elasticsearch_json = 
context.test.get_node_by_name("PostElasticsearch")
-    post_elasticsearch_json.controller_services.append(ssl_context_service)
-    post_elasticsearch_json.set_property("SSL Context Service", 
ssl_context_service.name)
-
-
-@given(u'an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication')
-def step_impl(context):
-    elasticsearch_credential_service = ElasticsearchCredentialsService()
-    post_elasticsearch_json = 
context.test.get_node_by_name("PostElasticsearch")
-    
post_elasticsearch_json.controller_services.append(elasticsearch_credential_service)
-    post_elasticsearch_json.set_property("Elasticsearch Credentials Provider 
Service", elasticsearch_credential_service.name)
-
-
-@given(u'an ElasticsearchCredentialsService is set up for PostElasticsearch 
with ApiKey')
-def step_impl(context):
-    api_key = context.test.elastic_generate_apikey("elasticsearch")
-    elasticsearch_credential_service = ElasticsearchCredentialsService(api_key)
-    post_elasticsearch_json = 
context.test.get_node_by_name("PostElasticsearch")
-    
post_elasticsearch_json.controller_services.append(elasticsearch_credential_service)
-    post_elasticsearch_json.set_property("Elasticsearch Credentials Provider 
Service", elasticsearch_credential_service.name)
-
-
 # splunk hec
 @given("a Splunk HEC is set up and running")
 def step_impl(context):
@@ -970,26 +919,6 @@ def step_impl(context):
     context.test.check_all_prometheus_metric_types_are_defined_once()
 
 
-@then("Elasticsearch is empty")
-def step_impl(context):
-    
context.test.check_empty_elastic(context.test.get_container_name_with_postfix("elasticsearch"))
-
-
-@then(u'Elasticsearch has a document with "{doc_id}" in "{index}" that has 
"{value}" set in "{field}"')
-def step_impl(context, doc_id, index, value, field):
-    
context.test.check_elastic_field_value(context.test.get_container_name_with_postfix("elasticsearch"),
 index_name=index, doc_id=doc_id, field_name=field, field_value=value)
-
-
-@then("Opensearch is empty")
-def step_impl(context):
-    context.test.check_empty_elastic(f"opensearch-{context.feature_id}")
-
-
-@then(u'Opensearch has a document with "{doc_id}" in "{index}" that has 
"{value}" set in "{field}"')
-def step_impl(context, doc_id, index, value, field):
-    context.test.check_elastic_field_value(f"opensearch-{context.feature_id}", 
index_name=index, doc_id=doc_id, field_name=field, field_value=value)
-
-
 # MiNiFi C2 Server
 @given("ssl properties are set up for MiNiFi C2 server")
 def step_impl(context):
diff --git a/docker/test/integration/minifi/processors/PostElasticsearch.py 
b/docker/test/integration/minifi/processors/PostElasticsearch.py
deleted file mode 100644
index 11e506ff1..000000000
--- a/docker/test/integration/minifi/processors/PostElasticsearch.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class PostElasticsearch(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(PostElasticsearch, self).__init__(
-            context=context,
-            clazz='PostElasticsearch',
-            properties={
-                'Hosts': f'https://elasticsearch-{context.feature_id}:9200',
-                'Index': 'test',
-                'Identifier': '${filename}'
-            },
-            auto_terminate=['success', 'failure', 'error'],
-            schedule=schedule)
diff --git a/docker/test/integration/features/elasticsearch.feature 
b/extensions/elasticsearch/tests/features/elasticsearch.feature
similarity index 65%
rename from docker/test/integration/features/elasticsearch.feature
rename to extensions/elasticsearch/tests/features/elasticsearch.feature
index 182e016e2..4b9fde71f 100644
--- a/docker/test/integration/features/elasticsearch.feature
+++ b/extensions/elasticsearch/tests/features/elasticsearch.feature
@@ -16,25 +16,27 @@
 @ENABLE_ELASTICSEARCH
 Feature: Managing documents on Elasticsearch with PostElasticsearch
 
-  Background:
-    Given the content of "/tmp/output" is monitored
-
   Scenario Outline: MiNiFi instance indexes a document on Elasticsearch using 
Basic Authentication
     Given an Elasticsearch server is set up and running
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "{ "field1" : "value1" }" is present in 
"/tmp/input"
     And a PostElasticsearch processor
+    And PostElasticsearch is EVENT_DRIVEN
+    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://elasticsearch-${scenario_id}:9200";
     And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
     And the "Identifier" property of the PostElasticsearch processor is set to 
"my_id"
     And the "Action" property of the PostElasticsearch processor is set to 
<action>
-    And a SSL context service is set up for PostElasticsearch and Elasticsearch
-    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication
+    And the "Elasticsearch Credentials Provider Service" property of the 
PostElasticsearch processor is set to 
"ElasticsearchCredentialsControllerService"
+    And an ssl context service is set up for PostElasticsearch
+    And an ElasticsearchCredentialsControllerService is set up with Basic 
Authentication
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
     And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "{ "field1" : "value1" }" is placed in 
the monitored directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "{ "field1" : "value1" }" content in the 
"/tmp/output" directory in less than 20 seconds
     And Elasticsearch has a document with "my_id" in "my_index" that has 
"value1" set in "field1"
 
     Examples:
@@ -47,17 +49,22 @@ Feature: Managing documents on Elasticsearch with 
PostElasticsearch
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "hello world" is present in "/tmp/input"
     And a PostElasticsearch processor
+    And PostElasticsearch is EVENT_DRIVEN
+    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://elasticsearch-${scenario_id}:9200";
     And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
     And the "Identifier" property of the PostElasticsearch processor is set to 
"preloaded_id"
     And the "Action" property of the PostElasticsearch processor is set to 
"delete"
-    And a SSL context service is set up for PostElasticsearch and Elasticsearch
-    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with ApiKey
+    And the "Elasticsearch Credentials Provider Service" property of the 
PostElasticsearch processor is set to 
"ElasticsearchCredentialsControllerService"
+    And an ssl context service is set up for PostElasticsearch
+    And an ElasticsearchCredentialsControllerService is set up with ApiKey
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
     And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "hello world" is placed in the monitored 
directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "hello world" content in the 
"/tmp/output" directory in less than 20 seconds
     And Elasticsearch is empty
 
   Scenario: MiNiFi instance partially updates a document in Elasticsearch 
using Basic Authentication
@@ -65,16 +72,21 @@ Feature: Managing documents on Elasticsearch with 
PostElasticsearch
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "{ "field2" : "value2" }" is present in 
"/tmp/input"
     And a PostElasticsearch processor
+    And PostElasticsearch is EVENT_DRIVEN
+    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://elasticsearch-${scenario_id}:9200";
     And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
     And the "Identifier" property of the PostElasticsearch processor is set to 
"preloaded_id"
     And the "Action" property of the PostElasticsearch processor is set to 
"update"
-    And a SSL context service is set up for PostElasticsearch and Elasticsearch
-    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication
+    And the "Elasticsearch Credentials Provider Service" property of the 
PostElasticsearch processor is set to 
"ElasticsearchCredentialsControllerService"
+    And an ssl context service is set up for PostElasticsearch
+    And an ElasticsearchCredentialsControllerService is set up with Basic 
Authentication
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
     And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "{ "field2" : "value2" }" is placed in 
the monitored directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "{ "field2" : "value2" }" content in the 
"/tmp/output" directory in less than 20 seconds
     And Elasticsearch has a document with "preloaded_id" in "my_index" that 
has "value1" set in "field1"
     And Elasticsearch has a document with "preloaded_id" in "my_index" that 
has "value2" set in "field2"
diff --git 
a/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py 
b/extensions/elasticsearch/tests/features/environment.py
similarity index 60%
rename from 
docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
rename to extensions/elasticsearch/tests/features/environment.py
index 99592bf2a..ab35b52a5 100644
--- 
a/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
+++ b/extensions/elasticsearch/tests/features/environment.py
@@ -12,16 +12,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ..core.ControllerService import ControllerService
+from minifi_test_framework.core.hooks import common_before_scenario
+from minifi_test_framework.core.hooks import common_after_scenario
 
 
-class ElasticsearchCredentialsService(ControllerService):
-    def __init__(self, api_key=None, name=None):
-        super(ElasticsearchCredentialsService, self).__init__(name=name)
+def before_scenario(context, scenario):
+    common_before_scenario(context, scenario)
 
-        self.service_class = 'ElasticsearchCredentialsControllerService'
-        if api_key is None:
-            self.properties['Username'] = "elastic"
-            self.properties['Password'] = "password"
-        else:
-            self.properties['API Key'] = api_key
+
+def after_scenario(context, scenario):
+    common_after_scenario(context, scenario)
diff --git a/docker/test/integration/features/opensearch.feature 
b/extensions/elasticsearch/tests/features/opensearch.feature
similarity index 68%
rename from docker/test/integration/features/opensearch.feature
rename to extensions/elasticsearch/tests/features/opensearch.feature
index f4e2fabb8..c042988f7 100644
--- a/docker/test/integration/features/opensearch.feature
+++ b/extensions/elasticsearch/tests/features/opensearch.feature
@@ -16,26 +16,27 @@
 @ENABLE_ELASTICSEARCH
 Feature: PostElasticsearch works on Opensearch (Opensearch doesnt support API 
Keys)
 
-  Background:
-    Given the content of "/tmp/output" is monitored
-
   Scenario Outline: MiNiFi instance creates a document on Opensearch using 
Basic Authentication
     Given an Opensearch server is set up and running
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "{ "field1" : "value1" }" is present in 
"/tmp/input"
     And a PostElasticsearch processor
-    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://opensearch-${feature_id}:9200";
+    And PostElasticsearch is EVENT_DRIVEN
+    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://opensearch-${scenario_id}:9200";
     And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
     And the "Identifier" property of the PostElasticsearch processor is set to 
"my_id"
     And the "Action" property of the PostElasticsearch processor is set to 
<action>
-    And a SSL context service is set up for PostElasticsearch and Opensearch
-    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication
+    And the "Elasticsearch Credentials Provider Service" property of the 
PostElasticsearch processor is set to 
"ElasticsearchCredentialsControllerService"
+    And an ssl context service is set up for PostElasticsearch
+    And an ElasticsearchCredentialsControllerService is set up with Basic 
Authentication
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
     And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "{ "field1" : "value1" }" is placed in 
the monitored directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "{ "field1" : "value1" }" content in the 
"/tmp/output" directory in less than 20 seconds
     And Opensearch has a document with "my_id" in "my_index" that has "value1" 
set in "field1"
 
     Examples:
@@ -48,18 +49,22 @@ Feature: PostElasticsearch works on Opensearch (Opensearch 
doesnt support API Ke
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "hello world" is present in "/tmp/input"
     And a PostElasticsearch processor
-    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://opensearch-${feature_id}:9200";
+    And PostElasticsearch is EVENT_DRIVEN
+    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://opensearch-${scenario_id}:9200";
     And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
     And the "Identifier" property of the PostElasticsearch processor is set to 
"preloaded_id"
     And the "Action" property of the PostElasticsearch processor is set to 
"delete"
-    And a SSL context service is set up for PostElasticsearch and Opensearch
-    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication
+    And the "Elasticsearch Credentials Provider Service" property of the 
PostElasticsearch processor is set to 
"ElasticsearchCredentialsControllerService"
+    And an ssl context service is set up for PostElasticsearch
+    And an ElasticsearchCredentialsControllerService is set up with Basic 
Authentication
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
     And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "hello world" is placed in the monitored 
directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "hello world" content in the 
"/tmp/output" directory in less than 20 seconds
     And Opensearch is empty
 
   Scenario: MiNiFi instance partially updates a document in Opensearch using 
Basic Authentication
@@ -67,17 +72,21 @@ Feature: PostElasticsearch works on Opensearch (Opensearch 
doesnt support API Ke
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "{ "field2" : "value2" }" is present in 
"/tmp/input"
     And a PostElasticsearch processor
-    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://opensearch-${feature_id}:9200";
+    And PostElasticsearch is EVENT_DRIVEN
+    And the "Hosts" property of the PostElasticsearch processor is set to 
"https://opensearch-${scenario_id}:9200";
     And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
     And the "Identifier" property of the PostElasticsearch processor is set to 
"preloaded_id"
     And the "Action" property of the PostElasticsearch processor is set to 
"update"
-    And a SSL context service is set up for PostElasticsearch and Opensearch
-    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication
+    And the "Elasticsearch Credentials Provider Service" property of the 
PostElasticsearch processor is set to 
"ElasticsearchCredentialsControllerService"
+    And an ssl context service is set up for PostElasticsearch
+    And an ElasticsearchCredentialsControllerService is set up with Basic 
Authentication
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
     And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "{ "field2" : "value2" }" is placed in 
the monitored directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "{ "field2" : "value2" }" content in the 
"/tmp/output" directory in less than 20 seconds
     And Opensearch has a document with "preloaded_id" in "my_index" that has 
"value1" set in "field1"
     And Opensearch has a document with "preloaded_id" in "my_index" that has 
"value2" set in "field2"
diff --git a/docker/test/integration/resources/elasticsearch/elasticsearch.yml 
b/extensions/elasticsearch/tests/features/resources/elasticsearch.yml
similarity index 100%
rename from docker/test/integration/resources/elasticsearch/elasticsearch.yml
rename to extensions/elasticsearch/tests/features/resources/elasticsearch.yml
diff --git a/docker/test/integration/resources/opensearch/opensearch.yml 
b/extensions/elasticsearch/tests/features/resources/opensearch.yml
similarity index 100%
rename from docker/test/integration/resources/opensearch/opensearch.yml
rename to extensions/elasticsearch/tests/features/resources/opensearch.yml
diff --git 
a/extensions/elasticsearch/tests/features/steps/elastic_base_container.py 
b/extensions/elasticsearch/tests/features/steps/elastic_base_container.py
new file mode 100644
index 000000000..a2a1334cb
--- /dev/null
+++ b/extensions/elasticsearch/tests/features/steps/elastic_base_container.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi_test_framework.core.helpers import wait_for_condition, retry_check
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+
+
+class ElasticBaseContainer(Container):
+    def __init__(self, test_context: MinifiTestContext, image: str, 
container_name: str):
+        super().__init__(image, container_name, test_context.network)
+        self.user = None
+
+    def deploy(self, finished_str: str):
+        super().deploy()
+        return wait_for_condition(
+            condition=lambda: finished_str in self.get_logs(),
+            timeout_seconds=60,
+            bail_condition=lambda: self.exited,
+            context=None)
+
+    def create_doc_elasticsearch(self, index_name: str, doc_id: str):
+        (code, output) = self.exec_run(["/bin/bash", "-c",
+                                        "curl -s -u elastic:password -k -XPUT 
https://localhost:9200/"; + index_name + "/_doc/"
+                                        + doc_id + " -H 
Content-Type:application/json -d'{\"field1\":\"value1\"}'"])
+        return code == 0 and ('"_id":"' + doc_id + '"') in output
+
+    def check_elastic_field_value(self, index_name: str, doc_id: str, 
field_name: str, field_value: str):
+        (code, output) = self.exec_run(["/bin/bash", "-c", "curl -s -u 
elastic:password -k -XGET https://localhost:9200/"; + index_name + "/_doc/" + 
doc_id])
+        return code == 0 and (field_name + '":"' + field_value) in output
+
+    @retry_check()
+    def is_elasticsearch_empty(self):
+        (code, output) = self.exec_run(["curl", "-s", "-u", 
"elastic:password", "-k", "-XGET", "https://localhost:9200/_search";])
+        return code == 0 and '"hits":[]' in output
diff --git 
a/extensions/elasticsearch/tests/features/steps/elasticsearch_container.py 
b/extensions/elasticsearch/tests/features/steps/elasticsearch_container.py
new file mode 100644
index 000000000..3226f8da2
--- /dev/null
+++ b/extensions/elasticsearch/tests/features/steps/elasticsearch_container.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+from elastic_base_container import ElasticBaseContainer
+from pathlib import Path
+from OpenSSL import crypto
+from minifi_test_framework.core.ssl_utils import make_server_cert, 
make_cert_without_extended_usage
+from minifi_test_framework.containers.file import File
+from minifi_test_framework.containers.host_file import HostFile
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+
+
+class ElasticsearchContainer(ElasticBaseContainer):
+    def __init__(self, test_context: MinifiTestContext):
+        super().__init__(test_context, "elasticsearch:9.1.5", 
f"elasticsearch-{test_context.scenario_id}")
+
+        http_cert, http_key = make_server_cert(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+        transport_cert, transport_key = 
make_cert_without_extended_usage(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+
+        root_ca_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, 
cert=test_context.root_ca_cert)
+        self.files.append(File("/usr/share/elasticsearch/config/certs/", 
"root_ca.crt", root_ca_content, permissions=0o644))
+
+        http_cert_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, 
cert=http_cert)
+        self.files.append(File("/usr/share/elasticsearch/config/certs/", 
"elastic_http.crt", http_cert_content, permissions=0o644))
+
+        http_key_content = crypto.dump_privatekey(type=crypto.FILETYPE_PEM, 
pkey=http_key)
+        self.files.append(File("/usr/share/elasticsearch/config/certs/", 
"elastic_http.key", http_key_content, permissions=0o644))
+
+        transport_cert_content = 
crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=transport_cert)
+        self.files.append(File("/usr/share/elasticsearch/config/certs/", 
"elastic_transport.crt", transport_cert_content, permissions=0o644))
+
+        transport_key_content = 
crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=transport_key)
+        self.files.append(File("/usr/share/elasticsearch/config/certs/", 
"elastic_transport.key", transport_key_content, permissions=0o644))
+
+        features_dir = Path(__file__).resolve().parent.parent
+        
self.host_files.append(HostFile('/usr/share/elasticsearch/config/elasticsearch.yml',
 os.path.join(features_dir, "resources", "elasticsearch.yml")))
+
+        self.environment.append("ELASTIC_PASSWORD=password")
+
+    def deploy(self):
+        return super().deploy('"current.health":"GREEN"')
+
+    def elastic_generate_apikey(self):
+        (code, output) = self.exec_run(["/bin/bash", "-c",
+                                        "curl -s -u elastic:password -k -XPOST 
https://localhost:9200/_security/api_key -H Content-Type:application/json 
-d'{\"name\":\"my-api-key\",\"expiration\":\"1d\",\"role_descriptors\":{\"role-a\":
 {\"cluster\": [\"all\"],\"index\": [{\"names\": [\"my_index\"],\"privileges\": 
[\"all\"]}]}}}'"])
+        if code != 0:
+            return None
+        output_lines = output.splitlines()
+        result = json.loads(output_lines[-1])
+        return result["encoded"]
diff --git 
a/extensions/elasticsearch/tests/features/steps/opensearch_container.py 
b/extensions/elasticsearch/tests/features/steps/opensearch_container.py
new file mode 100644
index 000000000..40a510975
--- /dev/null
+++ b/extensions/elasticsearch/tests/features/steps/opensearch_container.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from elastic_base_container import ElasticBaseContainer
+from pathlib import Path
+from OpenSSL import crypto
+from minifi_test_framework.core.ssl_utils import make_server_cert
+from minifi_test_framework.containers.file import File
+from minifi_test_framework.containers.host_file import HostFile
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+
+
+class OpensearchContainer(ElasticBaseContainer):
+    def __init__(self, test_context: MinifiTestContext):
+        super().__init__(test_context, "opensearchproject/opensearch:2.6.0", 
f"opensearch-{test_context.scenario_id}")
+
+        admin_pem, admin_key = make_server_cert(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+
+        root_ca_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, 
cert=test_context.root_ca_cert)
+        self.files.append(File("/usr/share/opensearch/config/", "root-ca.pem", 
root_ca_content, permissions=0o644))
+
+        admin_pem_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, 
cert=admin_pem)
+        self.files.append(File("/usr/share/opensearch/config/", "admin.pem", 
admin_pem_content, permissions=0o644))
+
+        admin_key_content = crypto.dump_privatekey(type=crypto.FILETYPE_PEM, 
pkey=admin_key)
+        self.files.append(File("/usr/share/opensearch/config/", 
"admin-key.pem", admin_key_content, permissions=0o644))
+
+        features_dir = Path(__file__).resolve().parent.parent
+        
self.host_files.append(HostFile('/usr/share/opensearch/config/opensearch.yml', 
os.path.join(features_dir, "resources", "opensearch.yml")))
+
+    def deploy(self):
+        return super().deploy('Hot-reloading of audit configuration is 
enabled')
+
+    def add_elastic_user_to_opensearch(self):
+        (code, output) = self.exec_run(["/bin/bash", "-c",
+                                        f'curl -s -u admin:admin -k -XPUT 
https://{self.container_name}:9200/_plugins/_security/api/internalusers/elastic 
-H Content-Type:application/json 
-d\'{{"password":"password","backend_roles":["admin"]}}\''])
+        return code == 0 and '"status":"CREATED"' in output
diff --git a/extensions/elasticsearch/tests/features/steps/steps.py 
b/extensions/elasticsearch/tests/features/steps/steps.py
new file mode 100644
index 000000000..5ee8f8439
--- /dev/null
+++ b/extensions/elasticsearch/tests/features/steps/steps.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from behave import step, given, then
+
+from minifi_test_framework.steps import checking_steps        # noqa: F401
+from minifi_test_framework.steps import configuration_steps   # noqa: F401
+from minifi_test_framework.steps import core_steps            # noqa: F401
+from minifi_test_framework.steps import flow_building_steps   # noqa: F401
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from minifi_test_framework.minifi.controller_service import ControllerService
+from elasticsearch_container import ElasticsearchContainer
+from opensearch_container import OpensearchContainer
+
+
+@step('an Elasticsearch server is set up and running')
+@step('an Elasticsearch server is set up and a single document is present with 
"preloaded_id" in "my_index"')
+@step('an Elasticsearch server is set up and a single document is present with 
"preloaded_id" in "my_index" with "value1" in "field1"')
+def step_impl(context: MinifiTestContext):
+    context.containers["elasticsearch"] = ElasticsearchContainer(context)
+    context.containers["elasticsearch"].deploy()
+    context.containers["elasticsearch"].create_doc_elasticsearch("my_index", 
"preloaded_id")
+
+
+@given(u'an ElasticsearchCredentialsControllerService is set up with Basic 
Authentication')
+def step_impl(context: MinifiTestContext):
+    controller_service = 
ControllerService(class_name="ElasticsearchCredentialsControllerService", 
service_name="ElasticsearchCredentialsControllerService")
+    controller_service.add_property("Username", "elastic")
+    controller_service.add_property("Password", "password")
+    
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service)
+
+
+@given(u'an ElasticsearchCredentialsControllerService is set up with ApiKey')
+def step_impl(context: MinifiTestContext):
+    controller_service = 
ControllerService(class_name="ElasticsearchCredentialsControllerService", 
service_name="ElasticsearchCredentialsControllerService")
+    api_key = context.containers["elasticsearch"].elastic_generate_apikey()
+    controller_service.add_property("API Key", api_key)
+    
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service)
+
+
+@then(u'Elasticsearch has a document with "{doc_id}" in "{index}" that has 
"{value}" set in "{field}"')
+def step_impl(context: MinifiTestContext, doc_id: str, index: str, value: str, 
field: str):
+    assert 
context.containers["elasticsearch"].check_elastic_field_value(index_name=index, 
doc_id=doc_id, field_name=field, field_value=value)
+
+
+@then("Elasticsearch is empty")
+def step_impl(context):
+    assert context.containers["elasticsearch"].is_elasticsearch_empty()
+
+
+@given('an Opensearch server is set up and running')
+@given('an Opensearch server is set up and a single document is present with 
"preloaded_id" in "my_index"')
+@given('an Opensearch server is set up and a single document is present with 
"preloaded_id" in "my_index" with "value1" in "field1"')
+def step_impl(context):
+    context.containers["opensearch"] = OpensearchContainer(context)
+    context.containers["opensearch"].deploy()
+    context.containers["opensearch"].add_elastic_user_to_opensearch()
+    context.containers["opensearch"].create_doc_elasticsearch("my_index", 
"preloaded_id")
+
+
+@then(u'Opensearch has a document with "{doc_id}" in "{index}" that has 
"{value}" set in "{field}"')
+def step_impl(context: MinifiTestContext, doc_id: str, index: str, value: str, 
field: str):
+    assert 
context.containers["opensearch"].check_elastic_field_value(index_name=index, 
doc_id=doc_id, field_name=field, field_value=value)
+
+
+@then("Opensearch is empty")
+def step_impl(context):
+    assert context.containers["opensearch"].is_elasticsearch_empty()

Reply via email to