This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ca4fa86a29de25fc42880bc2e775ac23f5bfded3
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Jan 7 17:36:36 2026 +0100

    MINIFICPP-2674 Move Couchbase tests to modular docker tests
    
    Closes #2066
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
---
 .../containers/minifi_container.py                 |   6 +-
 .../src/minifi_test_framework/core/helpers.py      |  19 +++
 .../steps/flow_building_steps.py                   |  25 +++-
 docker/RunBehaveTests.sh                           |   3 +-
 docker/requirements.txt                            |   1 -
 docker/test/integration/cluster/ContainerStore.py  |   9 --
 .../test/integration/cluster/DockerTestCluster.py  |   5 -
 .../cluster/checkers/CouchbaseChecker.py           |  69 ---------
 .../cluster/containers/CouchbaseServerContainer.py | 125 ----------------
 .../features/MiNiFi_integration_test_driver.py     |   8 -
 docker/test/integration/features/steps/steps.py    |  49 ------
 .../minifi/controllers/CouchbaseClusterService.py  |  30 ----
 .../minifi/processors/PutCouchbaseKey.py           |  24 ---
 .../couchbase/tests}/features/couchbase.feature    |  98 +++++++-----
 .../couchbase/tests/features/environment.py        |  28 +++-
 .../features/steps/couchbase_server_container.py   | 165 +++++++++++++++++++++
 extensions/couchbase/tests/features/steps/steps.py |  67 +++++++++
 17 files changed, 363 insertions(+), 368 deletions(-)

diff --git 
a/behave_framework/src/minifi_test_framework/containers/minifi_container.py 
b/behave_framework/src/minifi_test_framework/containers/minifi_container.py
index 8678bdc0f..55d730957 100644
--- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py
+++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py
@@ -21,7 +21,7 @@ from OpenSSL import crypto
 from minifi_test_framework.core.minifi_test_context import MinifiTestContext
 from minifi_test_framework.containers.file import File
 from minifi_test_framework.minifi.flow_definition import FlowDefinition
-from minifi_test_framework.core.ssl_utils import 
make_cert_without_extended_usage
+from minifi_test_framework.core.ssl_utils import 
make_cert_without_extended_usage, make_client_cert
 from .container import Container
 
 
@@ -38,6 +38,10 @@ class MinifiContainer(Container):
         self.files.append(File("/tmp/resources/minifi_client.crt", 
crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_client_cert)))
         self.files.append(File("/tmp/resources/minifi_client.key", 
crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_client_key)))
 
+        clientuser_cert, clientuser_key = make_client_cert("clientuser", 
ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
+        self.files.append(File("/tmp/resources/clientuser.crt", 
crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=clientuser_cert)))
+        self.files.append(File("/tmp/resources/clientuser.key", 
crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=clientuser_key)))
+
         self.is_fhs = 'MINIFI_INSTALLATION_TYPE=FHS' in 
str(self.client.images.get(test_context.minifi_container_image).history())
 
         self._fill_default_properties()
diff --git a/behave_framework/src/minifi_test_framework/core/helpers.py 
b/behave_framework/src/minifi_test_framework/core/helpers.py
index e6ae51857..7b7992056 100644
--- a/behave_framework/src/minifi_test_framework/core/helpers.py
+++ b/behave_framework/src/minifi_test_framework/core/helpers.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import logging
 import time
+import functools
 from collections.abc import Callable
 
 import docker
@@ -82,3 +83,21 @@ def run_cmd_in_docker_image(image_name: str, cmd: str | 
list, network: str) -> s
 
 def run_shell_cmd_in_docker_image(image_name: str, cmd: str, network: str) -> 
str:
     return run_cmd_in_docker_image(image_name, ["/bin/sh", "-c", cmd], network)
+
+
+def retry_check(max_tries: int = 5, retry_interval_seconds: int = 1):
+    """
+    Decorator for retrying a checker function that returns a boolean. The 
decorated function is called repeatedly until it returns True
+    or the maximum number of attempts is reached. The maximum number of 
attempts and the interval between attempts in seconds can be configured.
+    """
+    def retry_check_func(func):
+        @functools.wraps(func)
+        def retry_wrapper(*args, **kwargs):
+            for i in range(max_tries):
+                if func(*args, **kwargs):
+                    return True
+                if i < max_tries - 1:
+                    time.sleep(retry_interval_seconds)
+            return False
+        return retry_wrapper
+    return retry_check_func
diff --git 
a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py 
b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
index 22c3c604d..11e19d104 100644
--- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
@@ -29,6 +29,11 @@ from minifi_test_framework.minifi.parameter_context import 
ParameterContext
 from minifi_test_framework.minifi.processor import Processor
 
 
+@given("a MiNiFi CPP server with yaml config")
+def step_impl(context: MinifiTestContext):
+    pass  # TODO(lordgamez): Needs to be implemented after JSON config is set 
to be default
+
+
 @given("a transient MiNiFi flow with a LogOnDestructionProcessor processor")
 def step_impl(context: MinifiTestContext):
     context.get_or_create_default_minifi_container().command = ["/bin/sh", 
"-c", "timeout 10s ./bin/minifi.sh run && sleep 100"]
@@ -140,6 +145,7 @@ def step_impl(context: MinifiTestContext, parameter_name: 
str, parameter_value:
 
 
 @step('a directory at "{directory}" has a file with the content "{content}" in 
the "{flow_name}" flow')
+@step("a directory at '{directory}' has a file with the content '{content}' in 
the '{flow_name}' flow")
 def step_impl(context: MinifiTestContext, directory: str, content: str, 
flow_name: str):
     new_content = content.replace("\\n", "\n")
     new_dir = Directory(directory)
@@ -148,6 +154,7 @@ def step_impl(context: MinifiTestContext, directory: str, 
content: str, flow_nam
 
 
 @step('a directory at "{directory}" has a file with the content "{content}"')
+@step("a directory at '{directory}' has a file with the content '{content}'")
 def step_impl(context: MinifiTestContext, directory: str, content: str):
     context.execute_steps(f'given a directory at "{directory}" has a file with 
the content "{content}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow')
 
@@ -240,14 +247,22 @@ def step_impl(context: MinifiTestContext, property_name: 
str, processor_name: st
 
 
 # TLS
-@given("an ssl context service is set up for {processor_name}")
-@given("an ssl context service with a manual CA cert file is set up for 
{processor_name}")
-def step_impl(context, processor_name):
+def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: 
str):
     controller_service = ControllerService(class_name="SSLContextService", 
service_name="SSLContextService")
-    controller_service.add_property("Client Certificate", 
"/tmp/resources/minifi_client.crt")
-    controller_service.add_property("Private Key", 
"/tmp/resources/minifi_client.key")
+    controller_service.add_property("Client Certificate", 
f"/tmp/resources/{cert_name}.crt")
+    controller_service.add_property("Private Key", 
f"/tmp/resources/{cert_name}.key")
     controller_service.add_property("CA Certificate", 
"/tmp/resources/root_ca.crt")
     
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service)
 
+
+@given("an ssl context service is set up")
+def step_impl(context: MinifiTestContext):
+    add_ssl_context_service_for_minifi(context, "minifi_client")
+
+
+@given("an ssl context service is set up for {processor_name}")
+@given("an ssl context service with a manual CA cert file is set up for 
{processor_name}")
+def step_impl(context, processor_name):
+    add_ssl_context_service_for_minifi(context, "minifi_client")
     processor = 
context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name)
     processor.add_property('SSL Context Service', 'SSLContextService')
diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh
index 786e9e6b3..2f9f4b949 100755
--- a/docker/RunBehaveTests.sh
+++ b/docker/RunBehaveTests.sh
@@ -200,4 +200,5 @@ exec \
     "${docker_dir}/../extensions/sql/tests/features" \
     "${docker_dir}/../extensions/llamacpp/tests/features" \
     "${docker_dir}/../extensions/opc/tests/features" \
-    "${docker_dir}/../extensions/kafka/tests/features"
+    "${docker_dir}/../extensions/kafka/tests/features" \
+    "${docker_dir}/../extensions/couchbase/tests/features"
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 2a2da597a..77c4c2fdc 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -10,5 +10,4 @@ azure-storage-blob==12.24.1
 prometheus-api-client==0.5.5
 humanfriendly==10.0
 requests<2.29  # https://github.com/docker/docker-py/issues/3113
-couchbase==4.3.5
 paho-mqtt==2.1.0
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index 31e1be1b5..f225e1864 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -36,7 +36,6 @@ from .containers.MinifiC2ServerContainer import 
MinifiC2ServerContainer
 from .containers.GrafanaLokiContainer import GrafanaLokiContainer
 from .containers.GrafanaLokiContainer import GrafanaLokiOptions
 from .containers.ReverseProxyContainer import ReverseProxyContainer
-from .containers.CouchbaseServerContainer import CouchbaseServerContainer
 from .FeatureContext import FeatureContext
 
 
@@ -266,14 +265,6 @@ class ContainerStore:
                                                                     
network=self.network,
                                                                     
image_store=self.image_store,
                                                                     
command=command))
-        elif engine == "couchbase-server":
-            return self.containers.setdefault(container_name,
-                                              
CouchbaseServerContainer(feature_context=feature_context,
-                                                                       
name=container_name,
-                                                                       
vols=self.vols,
-                                                                       
network=self.network,
-                                                                       
image_store=self.image_store,
-                                                                       
command=command))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index 58902f132..2d0aa4aad 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -35,7 +35,6 @@ from .checkers.PrometheusChecker import PrometheusChecker
 from .checkers.SplunkChecker import SplunkChecker
 from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
 from .checkers.ModbusChecker import ModbusChecker
-from .checkers.CouchbaseChecker import CouchbaseChecker
 from .checkers.MqttHelper import MqttHelper
 from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, 
retry_check
 
@@ -56,7 +55,6 @@ class DockerTestCluster:
         self.grafana_loki_checker = GrafanaLokiChecker()
         self.minifi_controller_executor = 
MinifiControllerExecutor(self.container_communicator)
         self.modbus_checker = ModbusChecker(self.container_communicator)
-        self.couchbase_checker = CouchbaseChecker()
         self.mqtt_helper = MqttHelper()
 
     def cleanup(self):
@@ -447,8 +445,5 @@ class DockerTestCluster:
     def enable_ssl_in_nifi(self):
         self.container_store.enable_ssl_in_nifi()
 
-    def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, 
expected_data: str, expected_data_type: str):
-        return self.couchbase_checker.is_data_present_in_couchbase(doc_id, 
bucket_name, expected_data, expected_data_type)
-
     def publish_test_mqtt_message(self, topic: str, message: str):
         self.mqtt_helper.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/cluster/checkers/CouchbaseChecker.py 
b/docker/test/integration/cluster/checkers/CouchbaseChecker.py
deleted file mode 100644
index 07a332cbf..000000000
--- a/docker/test/integration/cluster/checkers/CouchbaseChecker.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-import json
-from couchbase.cluster import Cluster
-from couchbase.options import ClusterOptions
-from couchbase.auth import PasswordAuthenticator
-from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder
-
-
-class CouchbaseChecker:
-    def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, 
expected_data: str, expected_data_type: str):
-        try:
-            cluster = Cluster('couchbase://localhost', ClusterOptions(
-                PasswordAuthenticator('Administrator', 'password123')))
-
-            bucket = cluster.bucket(bucket_name)
-            collection = bucket.default_collection()
-
-            if expected_data_type.lower() == "binary":
-                binary_flag = 0x03 << 24
-                result = collection.get(doc_id, 
transcoder=RawBinaryTranscoder())
-                flags = result.flags
-                if not flags & binary_flag:
-                    logging.error(f"Expected binary data for document 
'{doc_id}' but no binary flags were found.")
-                    return False
-
-                content = result.content_as[bytes]
-                return content.decode('utf-8') == expected_data
-
-            if expected_data_type.lower() == "json":
-                json_flag = 0x02 << 24
-                result = collection.get(doc_id)
-                flags = result.flags
-                if not flags & json_flag:
-                    logging.error(f"Expected JSON data for document '{doc_id}' 
but no JSON flags were found.")
-                    return False
-
-                content = result.content_as[dict]
-                return content == json.loads(expected_data)
-
-            if expected_data_type.lower() == "string":
-                string_flag = 0x04 << 24
-                result = collection.get(doc_id, 
transcoder=RawStringTranscoder())
-                flags = result.flags
-                if not flags & string_flag:
-                    logging.error(f"Expected string data for document 
'{doc_id}' but no string flags were found.")
-                    return False
-
-                content = result.content_as[str]
-                return content == expected_data
-
-            logging.error(f"Unsupported data type '{expected_data_type}'")
-            return False
-        except Exception as e:
-            logging.error(f"Error while fetching document '{doc_id}' from 
bucket '{bucket_name}': {e}")
-            return False
diff --git 
a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py 
b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
deleted file mode 100644
index b2f13b732..000000000
--- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
+++ /dev/null
@@ -1,125 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import os
-import OpenSSL.crypto
-import tempfile
-import docker
-import requests
-import logging
-from requests.auth import HTTPBasicAuth
-from .Container import Container
-from utils import retry_check
-from ssl_utils.SSL_cert_utils import make_server_cert
-
-
-class CouchbaseServerContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command=None):
-        super().__init__(feature_context, name, "couchbase-server", vols, 
network, image_store, command)
-        couchbase_cert, couchbase_key = 
make_server_cert(f"couchbase-server-{feature_context.id}", 
feature_context.root_ca_cert, feature_context.root_ca_key)
-
-        self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=feature_context.root_ca_cert))
-        self.root_ca_file.close()
-        os.chmod(self.root_ca_file.name, 0o666)
-
-        self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=couchbase_cert))
-        self.couchbase_cert_file.close()
-        os.chmod(self.couchbase_cert_file.name, 0o666)
-
-        self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False)
-        
self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
 pkey=couchbase_key))
-        self.couchbase_key_file.close()
-        os.chmod(self.couchbase_key_file.name, 0o666)
-
-    def get_startup_finished_log_entry(self):
-        # after startup the logs are only available in the container, only 
this message is shown
-        return "logs available in"
-
-    @retry_check(max_tries=12, retry_interval=5)
-    def _run_couchbase_cli_command(self, command):
-        (code, _) = self.client.containers.get(self.name).exec_run(command)
-        if code != 0:
-            logging.error(f"Failed to run command '{command}', returned error 
code: {code}")
-            return False
-        return True
-
-    def _run_couchbase_cli_commands(self, commands):
-        return all(self._run_couchbase_cli_command(command) for command in 
commands)
-
-    @retry_check(max_tries=15, retry_interval=2)
-    def _load_couchbase_certs(self):
-        response = 
requests.post("http://localhost:8091/node/controller/loadTrustedCAs";, 
auth=HTTPBasicAuth("Administrator", "password123"))
-        if response.status_code != 200:
-            logging.error(f"Failed to load CA certificates, with status code: 
{response.status_code}")
-            return False
-
-        response = 
requests.post("http://localhost:8091/node/controller/reloadCertificate";, 
auth=HTTPBasicAuth("Administrator", "password123"))
-        if response.status_code != 200:
-            logging.error(f"Failed to reload certificates, with status code: 
{response.status_code}")
-            return False
-
-        return True
-
-    def run_post_startup_commands(self):
-        if self.post_startup_commands_finished:
-            return True
-
-        commands = [
-            ["couchbase-cli", "cluster-init", "-c", "localhost", 
"--cluster-username", "Administrator", "--cluster-password", "password123", 
"--services", "data,index,query",
-             "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
-            ["couchbase-cli", "bucket-create", "-c", "localhost", 
"--username", "Administrator", "--password", "password123", "--bucket", 
"test_bucket", "--bucket-type", "couchbase",
-             "--bucket-ramsize", "1024", "--max-ttl", "36000"],
-            ["couchbase-cli", "user-manage", "-c", "localhost", "-u", 
"Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", 
"--rbac-password", "password123",
-             "--roles", "data_reader[test_bucket],data_writer[test_bucket]", 
"--auth-domain", "local"],
-            ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", 
"prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''],
-            ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 
'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json']
-        ]
-        if not self._run_couchbase_cli_commands(commands):
-            return False
-
-        if not self._load_couchbase_certs():
-            return False
-
-        self.post_startup_commands_finished = True
-        return True
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        mounts = [
-            docker.types.Mount(
-                type='bind',
-                source=self.couchbase_key_file.name,
-                target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'),
-            docker.types.Mount(
-                type='bind',
-                source=self.couchbase_cert_file.name,
-                target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'),
-            docker.types.Mount(
-                type='bind',
-                source=self.root_ca_file.name,
-                target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt')
-        ]
-
-        self.docker_container = self.client.containers.run(
-            "couchbase:enterprise-7.2.5",
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            ports={'8091/tcp': 8091, '11210/tcp': 11210},
-            entrypoint=self.command,
-            mounts=mounts)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 9e07421c6..f04d0b6dc 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -85,11 +85,6 @@ class MiNiFi_integration_test:
         self.cluster.deploy_container('minifi-c2-server')
         assert 
self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or 
self.cluster.log_app_output()
 
-    def start_couchbase_server(self, context):
-        self.cluster.acquire_container(context=context, 
name='couchbase-server', engine='couchbase-server')
-        self.cluster.deploy_container('couchbase-server')
-        assert 
self.cluster.wait_for_container_startup_to_finish('couchbase-server') or 
self.cluster.log_app_output()
-
     def start_nifi(self, context):
         self.cluster.acquire_container(context=context, name='nifi', 
engine='nifi')
         self.cluster.deploy_container('nifi')
@@ -507,8 +502,5 @@ class MiNiFi_integration_test:
     def enable_ssl_in_nifi(self):
         self.cluster.enable_ssl_in_nifi()
 
-    def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: 
str, expected_data: str, expected_data_type: str):
-        assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, 
expected_data, expected_data_type)
-
     def publish_test_mqtt_message(self, topic, message):
         self.cluster.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 9483b0580..4f06d8b7e 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -27,7 +27,6 @@ from minifi.controllers.JsonRecordSetWriter import 
JsonRecordSetWriter
 from minifi.controllers.JsonTreeReader import JsonTreeReader
 from minifi.controllers.XMLReader import XMLReader
 from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter
-from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
 from minifi.controllers.XMLReader import XMLReader
 
 from behave import given, then, when
@@ -1297,54 +1296,6 @@ def step_impl(context, parameter_context_name):
     container.set_parameter_context_name(parameter_context_name)
 
 
-# Couchbase
-@when(u'a Couchbase server is started')
-def step_impl(context):
-    context.test.start_couchbase_server(context)
-
-
-@given("a CouchbaseClusterService is setup up with the name 
\"{service_name}\"")
-def step_impl(context, service_name):
-    couchbase_cluster_controller_service = CouchbaseClusterService(
-        name=service_name,
-        
connection_string="couchbase://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")))
-    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
-    container.add_controller(couchbase_cluster_controller_service)
-
-
-@given("a CouchbaseClusterService is set up up with SSL connection with the 
name \"{service_name}\"")
-def step_impl(context, service_name):
-    ssl_context_service = SSLContextService(name="SSLContextService",
-                                            
ca_cert='/tmp/resources/root_ca.crt')
-    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
-    container.add_controller(ssl_context_service)
-    couchbase_cluster_controller_service = CouchbaseClusterService(
-        name=service_name,
-        
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
-        ssl_context_service=ssl_context_service)
-    container.add_controller(couchbase_cluster_controller_service)
-
-
-@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present 
with data '{data}' of type \"{data_type}\" in Couchbase")
-def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: 
str):
-    context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, 
data_type)
-
-
-@given("a CouchbaseClusterService is setup up using mTLS authentication with 
the name \"{service_name}\"")
-def step_impl(context, service_name):
-    ssl_context_service = SSLContextService(name="SSLContextService",
-                                            
cert='/tmp/resources/clientuser.crt',
-                                            
key='/tmp/resources/clientuser.key',
-                                            
ca_cert='/tmp/resources/root_ca.crt')
-    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
-    container.add_controller(ssl_context_service)
-    couchbase_cluster_controller_service = CouchbaseClusterService(
-        name=service_name,
-        
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
-        ssl_context_service=ssl_context_service)
-    container.add_controller(couchbase_cluster_controller_service)
-
-
 @given("a LlamaCpp model is present on the MiNiFi host")
 def step_impl(context):
     context.test.llama_model_is_downloaded_in_minifi()
diff --git 
a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py 
b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
deleted file mode 100644
index 94494fe17..000000000
--- a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.ControllerService import ControllerService
-
-
-class CouchbaseClusterService(ControllerService):
-    def __init__(self, name, connection_string, ssl_context_service=None):
-        super(CouchbaseClusterService, self).__init__(name=name)
-
-        self.service_class = 'CouchbaseClusterService'
-        self.properties['Connection String'] = connection_string
-        if ssl_context_service:
-            self.linked_services.append(ssl_context_service)
-        if not ssl_context_service or ssl_context_service and 'Client 
Certificate' not in ssl_context_service.properties:
-            self.properties['User Name'] = "Administrator"
-            self.properties['User Password'] = "password123"
diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py 
b/docker/test/integration/minifi/processors/PutCouchbaseKey.py
deleted file mode 100644
index 341338771..000000000
--- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from ..core.Processor import Processor
-
-
-class PutCouchbaseKey(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(PutCouchbaseKey, self).__init__(
-            context=context,
-            clazz='PutCouchbaseKey',
-            auto_terminate=['success', 'failure', 'retry'],
-            schedule=schedule)
diff --git a/docker/test/integration/features/couchbase.feature 
b/extensions/couchbase/tests/features/couchbase.feature
similarity index 81%
rename from docker/test/integration/features/couchbase.feature
rename to extensions/couchbase/tests/features/couchbase.feature
index ecf4ce8a1..3464f0410 100644
--- a/docker/test/integration/features/couchbase.feature
+++ b/extensions/couchbase/tests/features/couchbase.feature
@@ -15,26 +15,27 @@
 
 @ENABLE_COUCHBASE
 Feature: Executing Couchbase operations from MiNiFi-C++
-  Background:
-    Given the content of "/tmp/output" is monitored
 
   Scenario: A MiNiFi instance can insert json data to test bucket with 
PutCouchbaseKey processor
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the PutCouchbaseKey processor is set 
to "Json"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+    And a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
     And the "retry" relationship of the PutCouchbaseKey processor is connected 
to the PutCouchbaseKey
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
     Then the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
@@ -46,21 +47,24 @@ Feature: Executing Couchbase operations from MiNiFi-C++
 
   Scenario: A MiNiFi instance can insert binary data to test bucket with 
PutCouchbaseKey processor
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1"}' is present in 
'/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the PutCouchbaseKey processor is set 
to "Binary"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+    And a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
     And the "retry" relationship of the PutCouchbaseKey processor is connected 
to the PutCouchbaseKey
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
     Then the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
@@ -72,16 +76,20 @@ Feature: Executing Couchbase operations from MiNiFi-C++
 
   Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And GetCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
     And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+    And a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
@@ -89,11 +97,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
     And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
-    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 100 seconds
+    Then a file with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the '/tmp/output' directory in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
@@ -101,18 +110,22 @@ Feature: Executing Couchbase operations from MiNiFi-C++
 
   Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor using binary storage
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the PutCouchbaseKey processor is set 
to "Binary"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And GetCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the GetCouchbaseKey processor is set 
to "Binary"
     And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+    And a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
@@ -120,11 +133,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
     And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
-    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 100 seconds
+    Then a file with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the '/tmp/output' directory in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
@@ -132,19 +146,23 @@ Feature: Executing Couchbase operations from MiNiFi-C++
 
   Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor and put the result in an attribute
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the PutCouchbaseKey processor is set 
to "String"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And GetCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the GetCouchbaseKey processor is set 
to "String"
     And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
     And the "Put Value to Attribute" property of the GetCouchbaseKey processor 
is set to "get_couchbase_result"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+    And a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
@@ -152,11 +170,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
     And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
-    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 100 seconds
+    Then a file with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the '/tmp/output' directory in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
@@ -165,24 +184,27 @@ Feature: Executing Couchbase operations from MiNiFi-C++
 
   Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on 
Couchbase value type mismatch
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the PutCouchbaseKey processor is set 
to "String"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And GetCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
     And the "Document Type" property of the GetCouchbaseKey processor is set 
to "Binary"
     And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
-    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+    And a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
     And the "retry" relationship of the PutCouchbaseKey processor is connected 
to the PutCouchbaseKey
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+    And GetCouchbaseKey's failure relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
     Then the Minifi logs contain the following message: "Failed to get content 
for document 'test_doc_id' from collection 'test_bucket._default._default' with 
the following exception: 'raw_binary_transcoder expects document to have BINARY 
common flags" in less than 100 seconds
 
@@ -190,16 +212,21 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And the "Keep Source File" property of the GetFile processor is set to 
"true"
     And the scheduling period of the GetFile processor is set to "20 seconds"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And GetCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
     And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is set up up with SSL connection with the 
name "CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+
+    And a CouchbaseClusterService is set up using SSL connection
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
@@ -207,29 +234,33 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
     And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
-    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 100 seconds
+    Then a file with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the '/tmp/output' directory in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.doc.expiry 
value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
 
   Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor using mTLS authentication
-    Given a MiNiFi CPP server with yaml config
-    And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
-    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a directory at '/tmp/input' has a file with the content '{"field1": 
"value1", "field2": "value2"}'
     And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And PutCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
     And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
     And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And GetCouchbaseKey is EVENT_DRIVEN
     And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
     And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And a CouchbaseClusterService is setup up using mTLS authentication with 
the name "CouchbaseClusterService"
+    And LogAttribute is EVENT_DRIVEN
+    And a CouchbaseClusterService is setup up using mTLS authentication
 
     And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
     And the "failure" relationship of the PutCouchbaseKey processor is 
connected to the PutCouchbaseKey
@@ -237,11 +268,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
     And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
 
     When a Couchbase server is started
-    And all instances start up
+    And the MiNiFi instance starts up
 
-    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 100 seconds
+    Then a file with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the '/tmp/output' directory in less than 100 seconds
     And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
     And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
diff --git a/docker/test/integration/minifi/processors/GetCouchbaseKey.py 
b/extensions/couchbase/tests/features/environment.py
similarity index 53%
rename from docker/test/integration/minifi/processors/GetCouchbaseKey.py
rename to extensions/couchbase/tests/features/environment.py
index 0b48dcbdd..01c4d4791 100644
--- a/docker/test/integration/minifi/processors/GetCouchbaseKey.py
+++ b/extensions/couchbase/tests/features/environment.py
@@ -12,13 +12,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ..core.Processor import Processor
+from minifi_test_framework.containers.docker_image_builder import 
DockerImageBuilder
+from minifi_test_framework.core.hooks import common_before_scenario
+from minifi_test_framework.core.hooks import common_after_scenario
 
 
-class GetCouchbaseKey(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(GetCouchbaseKey, self).__init__(
-            context=context,
-            clazz='GetCouchbaseKey',
-            auto_terminate=['success', 'failure', 'retry'],
-            schedule=schedule)
+def before_all(context):
+    dockerfile = """
+    FROM python:3.13-slim-bookworm
+    RUN pip install couchbase==4.3.5 requests"""
+    builder = DockerImageBuilder(
+        image_tag="minifi-couchbase-helper:latest",
+        dockerfile_content=dockerfile
+    )
+    builder.build()
+
+
+def before_scenario(context, scenario):
+    common_before_scenario(context, scenario)
+
+
+def after_scenario(context, scenario):
+    common_after_scenario(context, scenario)
diff --git 
a/extensions/couchbase/tests/features/steps/couchbase_server_container.py 
b/extensions/couchbase/tests/features/steps/couchbase_server_container.py
new file mode 100644
index 000000000..96cddf821
--- /dev/null
+++ b/extensions/couchbase/tests/features/steps/couchbase_server_container.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from OpenSSL import crypto
+from minifi_test_framework.core.helpers import wait_for_condition, retry_check
+from minifi_test_framework.core.ssl_utils import make_server_cert
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.containers.file import File
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from docker.errors import ContainerError
+
+
+class CouchbaseServerContainer(Container):
+    def __init__(self, test_context: MinifiTestContext):
+        super().__init__("couchbase:enterprise-7.2.5", 
f"couchbase-server-{test_context.scenario_id}", test_context.network)
+
+        couchbase_cert, couchbase_key = make_server_cert(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+
+        root_ca_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, 
cert=test_context.root_ca_cert)
+        
self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt", 
root_ca_content, permissions=0o666))
+        couchbase_cert_content = 
crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=couchbase_cert)
+        
self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/chain.pem", 
couchbase_cert_content, permissions=0o666))
+        couchbase_key_content = 
crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=couchbase_key)
+        
self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/pkey.key", 
couchbase_key_content, permissions=0o666))
+
+    def deploy(self):
+        super().deploy()
+        finished_str = "logs available in"
+        assert wait_for_condition(
+            condition=lambda: finished_str in self.get_logs(),
+            timeout_seconds=15,
+            bail_condition=lambda: self.exited,
+            context=None)
+        return self.run_post_startup_commands()
+
+    def run_post_startup_commands(self):
+        commands = [
+            ["couchbase-cli", "cluster-init", "-c", "localhost", 
"--cluster-username", "Administrator", "--cluster-password", "password123", 
"--services", "data,index,query",
+             "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
+            ["couchbase-cli", "bucket-create", "-c", "localhost", 
"--username", "Administrator", "--password", "password123", "--bucket", 
"test_bucket", "--bucket-type", "couchbase",
+             "--bucket-ramsize", "1024", "--max-ttl", "36000"],
+            ["couchbase-cli", "user-manage", "-c", "localhost", "-u", 
"Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", 
"--rbac-password", "password123",
+             "--roles", "data_reader[test_bucket],data_writer[test_bucket]", 
"--auth-domain", "local"],
+            ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", 
"prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''],
+            ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 
'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json']
+        ]
+        if not self._run_couchbase_cli_commands(commands):
+            return False
+
+        if not self._load_couchbase_certs():
+            return False
+
+        return True
+
+    @retry_check(max_tries=12, retry_interval=5)
+    def _run_couchbase_cli_command(self, command):
+        (code, output) = self.exec_run(command)
+        if code != 0:
+            logging.error(f"Failed to run command '{command}', returned error 
code: {code}, output: '{output}'")
+            return False
+        return True
+
+    def _run_couchbase_cli_commands(self, commands):
+        return all(self._run_couchbase_cli_command(command) for command in 
commands)
+
+    def _run_python_in_couchbase_helper_docker(self, command: str):
+        try:
+            self.client.containers.run("minifi-couchbase-helper:latest", 
["python", "-c", command], remove=True, stdout=True, stderr=True, 
network=self.network.name)
+            return True
+        except ContainerError as e:
+            stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout 
else ""
+            stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr 
else ""
+            logging.error(f"Python command '{command}' failed in couchbase 
helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'")
+            return False
+        except Exception as e:
+            logging.error(f"Unexpected error while running python command 
'{command}' in couchbase helper docker: '{e}'")
+            return False
+
+    @retry_check(max_tries=15, retry_interval=2)
+    def _load_couchbase_certs(self):
+        python_command = f"""
+import requests
+import sys
+from requests.auth import HTTPBasicAuth
+response = 
requests.post(f"http://{self.container_name}:8091/node/controller/loadTrustedCAs";,
 auth=HTTPBasicAuth("Administrator", "password123"))
+if response.status_code != 200:
+    sys.exit(1)
+
+response = 
requests.post(f"http://{self.container_name}:8091/node/controller/reloadCertificate";,
 auth=HTTPBasicAuth("Administrator", "password123"))
+if response.status_code != 200:
+    sys.exit(1)
+sys.exit(0)
+        """
+        return self._run_python_in_couchbase_helper_docker(python_command)
+
+    def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, 
expected_data: str, expected_data_type: str):
+        python_command = f"""
+from couchbase.cluster import Cluster
+from couchbase.options import ClusterOptions
+from couchbase.auth import PasswordAuthenticator
+from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder
+import json
+import sys
+
+try:
+    cluster = Cluster("couchbase://{self.container_name}", ClusterOptions(
+        PasswordAuthenticator('Administrator', 'password123')))
+
+    bucket = cluster.bucket("{bucket_name}")
+    collection = bucket.default_collection()
+
+    if "{expected_data_type}".lower() == "binary":
+        binary_flag = 0x03 << 24
+        result = collection.get("{doc_id}", transcoder=RawBinaryTranscoder())
+        flags = result.flags
+        if not flags & binary_flag:
+            print("Expected binary data for document '{doc_id}' but no binary 
flags were found.")
+            sys.exit(1)
+
+        content = result.content_as[bytes]
+        if content.decode('utf-8') == '{expected_data}':
+            sys.exit(0)
+
+    if "{expected_data_type}".lower() == "json":
+        json_flag = 0x02 << 24
+        result = collection.get("{doc_id}")
+        flags = result.flags
+        if not flags & json_flag:
+            print("Expected JSON data for document '{doc_id}' but no JSON 
flags were found.")
+            sys.exit(1)
+
+        content = result.content_as[dict]
+        if content == json.loads('{expected_data}'):
+            sys.exit(0)
+    if "{expected_data_type}".lower() == "string":
+        string_flag = 0x04 << 24
+        result = collection.get("{doc_id}", transcoder=RawStringTranscoder())
+        flags = result.flags
+        if not flags & string_flag:
+            print("Expected string data for document '{doc_id}' but no string 
flags were found.")
+            sys.exit(1)
+
+        content = result.content_as[str]
+        if content == '{expected_data}':
+            sys.exit(0)
+
+        sys.exit(1)
+except Exception as e:
+    sys.exit(1)
+        """
+        return self._run_python_in_couchbase_helper_docker(python_command)
diff --git a/extensions/couchbase/tests/features/steps/steps.py 
b/extensions/couchbase/tests/features/steps/steps.py
new file mode 100644
index 000000000..f20cfda2e
--- /dev/null
+++ b/extensions/couchbase/tests/features/steps/steps.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from behave import step, then
+
+from minifi_test_framework.steps import checking_steps        # noqa: F401
+from minifi_test_framework.steps import configuration_steps   # noqa: F401
+from minifi_test_framework.steps import core_steps            # noqa: F401
+from minifi_test_framework.steps import flow_building_steps   # noqa: F401
+from minifi_test_framework.steps.flow_building_steps import 
add_ssl_context_service_for_minifi
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from minifi_test_framework.core.helpers import log_due_to_failure
+from minifi_test_framework.minifi.controller_service import ControllerService
+from couchbase_server_container import CouchbaseServerContainer
+
+
+@step("a Couchbase server is started")
+def step_impl(context: MinifiTestContext):
+    context.containers["couchbase-server"] = CouchbaseServerContainer(context)
+    assert context.containers["couchbase-server"].deploy()
+
+
+@step("a CouchbaseClusterService controller service is set up to communicate 
with the Couchbase server")
+def step_impl(context: MinifiTestContext):
+    controller_service = 
ControllerService(class_name="CouchbaseClusterService", 
service_name="CouchbaseClusterService")
+    controller_service.add_property("Connection String", 
f"couchbase://couchbase-server-{context.scenario_id}")
+    controller_service.add_property("User Name", "Administrator")
+    controller_service.add_property("User Password", "password123")
+    
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service)
+
+
+@step("a CouchbaseClusterService is set up using SSL connection")
+def step_impl(context):
+    ssl_context_service = ControllerService(class_name="SSLContextService", 
service_name="SSLContextService")
+    ssl_context_service.add_property("CA Certificate", 
"/tmp/resources/root_ca.crt")
+    
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(ssl_context_service)
+    couchbase_cluster_service = 
ControllerService(class_name="CouchbaseClusterService", 
service_name="CouchbaseClusterService")
+    couchbase_cluster_service.add_property("Connection String", 
f"couchbases://couchbase-server-{context.scenario_id}")
+    couchbase_cluster_service.add_property("User Name", "Administrator")
+    couchbase_cluster_service.add_property("User Password", "password123")
+    couchbase_cluster_service.add_property("Linked Services", 
"SSLContextService")
+    
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(couchbase_cluster_service)
+
+
+@step("a CouchbaseClusterService is setup up using mTLS authentication")
+def step_impl(context: MinifiTestContext):
+    add_ssl_context_service_for_minifi(context, "clientuser")
+    controller_service = 
ControllerService(class_name="CouchbaseClusterService", 
service_name="CouchbaseClusterService")
+    controller_service.add_property("Connection String", 
f"couchbases://couchbase-server-{context.scenario_id}")
+    controller_service.add_property("Linked Services", "SSLContextService")
+    
context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service)
+
+
+@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present 
with data '{data}' of type \"{data_type}\" in Couchbase")
+def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: 
str):
+    assert 
context.containers["couchbase-server"].is_data_present_in_couchbase(doc_id, 
bucket_name, data, data_type) or log_due_to_failure(context)

Reply via email to