This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ca4fa86a29de25fc42880bc2e775ac23f5bfded3 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Jan 7 17:36:36 2026 +0100 MINIFICPP-2674 Move Couchbase tests to modular docker tests Closes #2066 Signed-off-by: Ferenc Gerlits <[email protected]> --- .../containers/minifi_container.py | 6 +- .../src/minifi_test_framework/core/helpers.py | 19 +++ .../steps/flow_building_steps.py | 25 +++- docker/RunBehaveTests.sh | 3 +- docker/requirements.txt | 1 - docker/test/integration/cluster/ContainerStore.py | 9 -- .../test/integration/cluster/DockerTestCluster.py | 5 - .../cluster/checkers/CouchbaseChecker.py | 69 --------- .../cluster/containers/CouchbaseServerContainer.py | 125 ---------------- .../features/MiNiFi_integration_test_driver.py | 8 - docker/test/integration/features/steps/steps.py | 49 ------ .../minifi/controllers/CouchbaseClusterService.py | 30 ---- .../minifi/processors/PutCouchbaseKey.py | 24 --- .../couchbase/tests}/features/couchbase.feature | 98 +++++++----- .../couchbase/tests/features/environment.py | 28 +++- .../features/steps/couchbase_server_container.py | 165 +++++++++++++++++++++ extensions/couchbase/tests/features/steps/steps.py | 67 +++++++++ 17 files changed, 363 insertions(+), 368 deletions(-) diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py index 8678bdc0f..55d730957 100644 --- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py @@ -21,7 +21,7 @@ from OpenSSL import crypto from minifi_test_framework.core.minifi_test_context import MinifiTestContext from minifi_test_framework.containers.file import File from minifi_test_framework.minifi.flow_definition import FlowDefinition -from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage +from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage, make_client_cert from .container import Container @@ -38,6 +38,10 @@ class MinifiContainer(Container): self.files.append(File("/tmp/resources/minifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_client_cert))) self.files.append(File("/tmp/resources/minifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_client_key))) + clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key) + self.files.append(File("/tmp/resources/clientuser.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=clientuser_cert))) + self.files.append(File("/tmp/resources/clientuser.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=clientuser_key))) + self.is_fhs = 'MINIFI_INSTALLATION_TYPE=FHS' in str(self.client.images.get(test_context.minifi_container_image).history()) self._fill_default_properties() diff --git a/behave_framework/src/minifi_test_framework/core/helpers.py b/behave_framework/src/minifi_test_framework/core/helpers.py index e6ae51857..7b7992056 100644 --- a/behave_framework/src/minifi_test_framework/core/helpers.py +++ b/behave_framework/src/minifi_test_framework/core/helpers.py @@ -19,6 +19,7 @@ from __future__ import annotations import logging import time +import functools from collections.abc import Callable import docker @@ -82,3 +83,21 @@ def run_cmd_in_docker_image(image_name: str, cmd: str | list, network: str) -> s def run_shell_cmd_in_docker_image(image_name: str, cmd: str, network: str) -> str: return run_cmd_in_docker_image(image_name, ["/bin/sh", "-c", cmd], network) + + +def retry_check(max_tries: int = 5, retry_interval_seconds: int = 1): + """ + Decorator for retrying a checker function that returns a boolean. The decorated function is called repeatedly until it returns True + or the maximum number of attempts is reached. The maximum number of attempts and the interval between attempts in seconds can be configured. + """ + def retry_check_func(func): + @functools.wraps(func) + def retry_wrapper(*args, **kwargs): + for i in range(max_tries): + if func(*args, **kwargs): + return True + if i < max_tries - 1: + time.sleep(retry_interval_seconds) + return False + return retry_wrapper + return retry_check_func diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index 22c3c604d..11e19d104 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -29,6 +29,11 @@ from minifi_test_framework.minifi.parameter_context import ParameterContext from minifi_test_framework.minifi.processor import Processor +@given("a MiNiFi CPP server with yaml config") +def step_impl(context: MinifiTestContext): + pass # TODO(lordgamez): Needs to be implemented after JSON config is set to be default + + @given("a transient MiNiFi flow with a LogOnDestructionProcessor processor") def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().command = ["/bin/sh", "-c", "timeout 10s ./bin/minifi.sh run && sleep 100"] @@ -140,6 +145,7 @@ def step_impl(context: MinifiTestContext, parameter_name: str, parameter_value: @step('a directory at "{directory}" has a file with the content "{content}" in the "{flow_name}" flow') +@step("a directory at '{directory}' has a file with the content '{content}' in the '{flow_name}' flow") def step_impl(context: MinifiTestContext, directory: str, content: str, flow_name: str): new_content = content.replace("\\n", "\n") new_dir = Directory(directory) @@ -148,6 +154,7 @@ def step_impl(context: MinifiTestContext, directory: str, content: str, flow_nam @step('a directory at "{directory}" has a file with the content "{content}"') +@step("a directory at '{directory}' has a file with the content '{content}'") def step_impl(context: MinifiTestContext, directory: str, content: str): context.execute_steps(f'given a directory at "{directory}" has a file with the content "{content}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') @@ -240,14 +247,22 @@ def step_impl(context: MinifiTestContext, property_name: str, processor_name: st # TLS -@given("an ssl context service is set up for {processor_name}") -@given("an ssl context service with a manual CA cert file is set up for {processor_name}") -def step_impl(context, processor_name): +def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: str): controller_service = ControllerService(class_name="SSLContextService", service_name="SSLContextService") - controller_service.add_property("Client Certificate", "/tmp/resources/minifi_client.crt") - controller_service.add_property("Private Key", "/tmp/resources/minifi_client.key") + controller_service.add_property("Client Certificate", f"/tmp/resources/{cert_name}.crt") + controller_service.add_property("Private Key", f"/tmp/resources/{cert_name}.key") controller_service.add_property("CA Certificate", "/tmp/resources/root_ca.crt") context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) + +@given("an ssl context service is set up") +def step_impl(context: MinifiTestContext): + add_ssl_context_service_for_minifi(context, "minifi_client") + + +@given("an ssl context service is set up for {processor_name}") +@given("an ssl context service with a manual CA cert file is set up for {processor_name}") +def step_impl(context, processor_name): + add_ssl_context_service_for_minifi(context, "minifi_client") processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name) processor.add_property('SSL Context Service', 'SSLContextService') diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh index 786e9e6b3..2f9f4b949 100755 --- a/docker/RunBehaveTests.sh +++ b/docker/RunBehaveTests.sh @@ -200,4 +200,5 @@ exec \ "${docker_dir}/../extensions/sql/tests/features" \ "${docker_dir}/../extensions/llamacpp/tests/features" \ "${docker_dir}/../extensions/opc/tests/features" \ - "${docker_dir}/../extensions/kafka/tests/features" + "${docker_dir}/../extensions/kafka/tests/features" \ + "${docker_dir}/../extensions/couchbase/tests/features" diff --git a/docker/requirements.txt b/docker/requirements.txt index 2a2da597a..77c4c2fdc 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -10,5 +10,4 @@ azure-storage-blob==12.24.1 prometheus-api-client==0.5.5 humanfriendly==10.0 requests<2.29 # https://github.com/docker/docker-py/issues/3113 -couchbase==4.3.5 paho-mqtt==2.1.0 diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 31e1be1b5..f225e1864 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -36,7 +36,6 @@ from .containers.MinifiC2ServerContainer import MinifiC2ServerContainer from .containers.GrafanaLokiContainer import GrafanaLokiContainer from .containers.GrafanaLokiContainer import GrafanaLokiOptions from .containers.ReverseProxyContainer import ReverseProxyContainer -from .containers.CouchbaseServerContainer import CouchbaseServerContainer from .FeatureContext import FeatureContext @@ -266,14 +265,6 @@ class ContainerStore: network=self.network, image_store=self.image_store, command=command)) - elif engine == "couchbase-server": - return self.containers.setdefault(container_name, - CouchbaseServerContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) else: raise Exception('invalid flow engine: \'%s\'' % engine) diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 58902f132..2d0aa4aad 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -35,7 +35,6 @@ from .checkers.PrometheusChecker import PrometheusChecker from .checkers.SplunkChecker import SplunkChecker from .checkers.GrafanaLokiChecker import GrafanaLokiChecker from .checkers.ModbusChecker import ModbusChecker -from .checkers.CouchbaseChecker import CouchbaseChecker from .checkers.MqttHelper import MqttHelper from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check @@ -56,7 +55,6 @@ class DockerTestCluster: self.grafana_loki_checker = GrafanaLokiChecker() self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator) self.modbus_checker = ModbusChecker(self.container_communicator) - self.couchbase_checker = CouchbaseChecker() self.mqtt_helper = MqttHelper() def cleanup(self): @@ -447,8 +445,5 @@ class DockerTestCluster: def enable_ssl_in_nifi(self): self.container_store.enable_ssl_in_nifi() - def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): - return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type) - def publish_test_mqtt_message(self, topic: str, message: str): self.mqtt_helper.publish_test_mqtt_message(topic, message) diff --git a/docker/test/integration/cluster/checkers/CouchbaseChecker.py b/docker/test/integration/cluster/checkers/CouchbaseChecker.py deleted file mode 100644 index 07a332cbf..000000000 --- a/docker/test/integration/cluster/checkers/CouchbaseChecker.py +++ /dev/null @@ -1,69 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import json -from couchbase.cluster import Cluster -from couchbase.options import ClusterOptions -from couchbase.auth import PasswordAuthenticator -from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder - - -class CouchbaseChecker: - def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): - try: - cluster = Cluster('couchbase://localhost', ClusterOptions( - PasswordAuthenticator('Administrator', 'password123'))) - - bucket = cluster.bucket(bucket_name) - collection = bucket.default_collection() - - if expected_data_type.lower() == "binary": - binary_flag = 0x03 << 24 - result = collection.get(doc_id, transcoder=RawBinaryTranscoder()) - flags = result.flags - if not flags & binary_flag: - logging.error(f"Expected binary data for document '{doc_id}' but no binary flags were found.") - return False - - content = result.content_as[bytes] - return content.decode('utf-8') == expected_data - - if expected_data_type.lower() == "json": - json_flag = 0x02 << 24 - result = collection.get(doc_id) - flags = result.flags - if not flags & json_flag: - logging.error(f"Expected JSON data for document '{doc_id}' but no JSON flags were found.") - return False - - content = result.content_as[dict] - return content == json.loads(expected_data) - - if expected_data_type.lower() == "string": - string_flag = 0x04 << 24 - result = collection.get(doc_id, transcoder=RawStringTranscoder()) - flags = result.flags - if not flags & string_flag: - logging.error(f"Expected string data for document '{doc_id}' but no string flags were found.") - return False - - content = result.content_as[str] - return content == expected_data - - logging.error(f"Unsupported data type '{expected_data_type}'") - return False - except Exception as e: - logging.error(f"Error while fetching document '{doc_id}' from bucket '{bucket_name}': {e}") - return False diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py deleted file mode 100644 index b2f13b732..000000000 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ /dev/null @@ -1,125 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import OpenSSL.crypto -import tempfile -import docker -import requests -import logging -from requests.auth import HTTPBasicAuth -from .Container import Container -from utils import retry_check -from ssl_utils.SSL_cert_utils import make_server_cert - - -class CouchbaseServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, "couchbase-server", vols, network, image_store, command) - couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key) - - self.root_ca_file = tempfile.NamedTemporaryFile(delete=False) - self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert)) - self.root_ca_file.close() - os.chmod(self.root_ca_file.name, 0o666) - - self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False) - self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert)) - self.couchbase_cert_file.close() - os.chmod(self.couchbase_cert_file.name, 0o666) - - self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False) - self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key)) - self.couchbase_key_file.close() - os.chmod(self.couchbase_key_file.name, 0o666) - - def get_startup_finished_log_entry(self): - # after startup the logs are only available in the container, only this message is shown - return "logs available in" - - @retry_check(max_tries=12, retry_interval=5) - def _run_couchbase_cli_command(self, command): - (code, _) = self.client.containers.get(self.name).exec_run(command) - if code != 0: - logging.error(f"Failed to run command '{command}', returned error code: {code}") - return False - return True - - def _run_couchbase_cli_commands(self, commands): - return all(self._run_couchbase_cli_command(command) for command in commands) - - @retry_check(max_tries=15, retry_interval=2) - def _load_couchbase_certs(self): - response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) - if response.status_code != 200: - logging.error(f"Failed to load CA certificates, with status code: {response.status_code}") - return False - - response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123")) - if response.status_code != 200: - logging.error(f"Failed to reload certificates, with status code: {response.status_code}") - return False - - return True - - def run_post_startup_commands(self): - if self.post_startup_commands_finished: - return True - - commands = [ - ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", - "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], - ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", - "--bucket-ramsize", "1024", "--max-ttl", "36000"], - ["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123", - "--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"], - ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''], - ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json'] - ] - if not self._run_couchbase_cli_commands(commands): - return False - - if not self._load_couchbase_certs(): - return False - - self.post_startup_commands_finished = True - return True - - def deploy(self): - if not self.set_deployed(): - return - - mounts = [ - docker.types.Mount( - type='bind', - source=self.couchbase_key_file.name, - target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'), - docker.types.Mount( - type='bind', - source=self.couchbase_cert_file.name, - target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'), - docker.types.Mount( - type='bind', - source=self.root_ca_file.name, - target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt') - ] - - self.docker_container = self.client.containers.run( - "couchbase:enterprise-7.2.5", - detach=True, - name=self.name, - network=self.network.name, - ports={'8091/tcp': 8091, '11210/tcp': 11210}, - entrypoint=self.command, - mounts=mounts) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 9e07421c6..f04d0b6dc 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -85,11 +85,6 @@ class MiNiFi_integration_test: self.cluster.deploy_container('minifi-c2-server') assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output() - def start_couchbase_server(self, context): - self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server') - self.cluster.deploy_container('couchbase-server') - assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output() - def start_nifi(self, context): self.cluster.acquire_container(context=context, name='nifi', engine='nifi') self.cluster.deploy_container('nifi') @@ -507,8 +502,5 @@ class MiNiFi_integration_test: def enable_ssl_in_nifi(self): self.cluster.enable_ssl_in_nifi() - def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): - assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type) - def publish_test_mqtt_message(self, topic, message): self.cluster.publish_test_mqtt_message(topic, message) diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 9483b0580..4f06d8b7e 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -27,7 +27,6 @@ from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter from minifi.controllers.JsonTreeReader import JsonTreeReader from minifi.controllers.XMLReader import XMLReader from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter -from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService from minifi.controllers.XMLReader import XMLReader from behave import given, then, when @@ -1297,54 +1296,6 @@ def step_impl(context, parameter_context_name): container.set_parameter_context_name(parameter_context_name) -# Couchbase -@when(u'a Couchbase server is started') -def step_impl(context): - context.test.start_couchbase_server(context) - - -@given("a CouchbaseClusterService is setup up with the name \"{service_name}\"") -def step_impl(context, service_name): - couchbase_cluster_controller_service = CouchbaseClusterService( - name=service_name, - connection_string="couchbase://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server"))) - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") - container.add_controller(couchbase_cluster_controller_service) - - -@given("a CouchbaseClusterService is set up up with SSL connection with the name \"{service_name}\"") -def step_impl(context, service_name): - ssl_context_service = SSLContextService(name="SSLContextService", - ca_cert='/tmp/resources/root_ca.crt') - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") - container.add_controller(ssl_context_service) - couchbase_cluster_controller_service = CouchbaseClusterService( - name=service_name, - connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), - ssl_context_service=ssl_context_service) - container.add_controller(couchbase_cluster_controller_service) - - -@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase") -def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str): - context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type) - - -@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"") -def step_impl(context, service_name): - ssl_context_service = SSLContextService(name="SSLContextService", - cert='/tmp/resources/clientuser.crt', - key='/tmp/resources/clientuser.key', - ca_cert='/tmp/resources/root_ca.crt') - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") - container.add_controller(ssl_context_service) - couchbase_cluster_controller_service = CouchbaseClusterService( - name=service_name, - connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), - ssl_context_service=ssl_context_service) - container.add_controller(couchbase_cluster_controller_service) - - @given("a LlamaCpp model is present on the MiNiFi host") def step_impl(context): context.test.llama_model_is_downloaded_in_minifi() diff --git a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py deleted file mode 100644 index 94494fe17..000000000 --- a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.ControllerService import ControllerService - - -class CouchbaseClusterService(ControllerService): - def __init__(self, name, connection_string, ssl_context_service=None): - super(CouchbaseClusterService, self).__init__(name=name) - - self.service_class = 'CouchbaseClusterService' - self.properties['Connection String'] = connection_string - if ssl_context_service: - self.linked_services.append(ssl_context_service) - if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties: - self.properties['User Name'] = "Administrator" - self.properties['User Password'] = "password123" diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py b/docker/test/integration/minifi/processors/PutCouchbaseKey.py deleted file mode 100644 index 341338771..000000000 --- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class PutCouchbaseKey(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PutCouchbaseKey, self).__init__( - context=context, - clazz='PutCouchbaseKey', - auto_terminate=['success', 'failure', 'retry'], - schedule=schedule) diff --git a/docker/test/integration/features/couchbase.feature b/extensions/couchbase/tests/features/couchbase.feature similarity index 81% rename from docker/test/integration/features/couchbase.feature rename to extensions/couchbase/tests/features/couchbase.feature index ecf4ce8a1..3464f0410 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/extensions/couchbase/tests/features/couchbase.feature @@ -15,26 +15,27 @@ @ENABLE_COUCHBASE Feature: Executing Couchbase operations from MiNiFi-C++ - Background: - Given the content of "/tmp/output" is monitored Scenario: A MiNiFi instance can insert json data to test bucket with PutCouchbaseKey processor Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "Json" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "retry" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds @@ -46,21 +47,24 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can insert binary data to test bucket with PutCouchbaseKey processor Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "retry" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds @@ -72,16 +76,20 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -89,11 +97,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds @@ -101,18 +110,22 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using binary storage Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -120,11 +133,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds @@ -132,19 +146,23 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor and put the result in an attribute Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "String" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the GetCouchbaseKey processor is set to "String" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And the "Put Value to Attribute" property of the GetCouchbaseKey processor is set to "get_couchbase_result" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -152,11 +170,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds @@ -165,24 +184,27 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on Couchbase value type mismatch Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "String" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "retry" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And GetCouchbaseKey's failure relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 100 seconds @@ -190,16 +212,21 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And the scheduling period of the GetFile processor is set to "20 seconds" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is set up up with SSL connection with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + + And a CouchbaseClusterService is set up using SSL connection And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -207,29 +234,33 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication - Given a MiNiFi CPP server with yaml config - And a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService is setup up using mTLS authentication And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -237,11 +268,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds diff --git a/docker/test/integration/minifi/processors/GetCouchbaseKey.py b/extensions/couchbase/tests/features/environment.py similarity index 53% rename from docker/test/integration/minifi/processors/GetCouchbaseKey.py rename to extensions/couchbase/tests/features/environment.py index 0b48dcbdd..01c4d4791 100644 --- a/docker/test/integration/minifi/processors/GetCouchbaseKey.py +++ b/extensions/couchbase/tests/features/environment.py @@ -12,13 +12,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..core.Processor import Processor +from minifi_test_framework.containers.docker_image_builder import DockerImageBuilder +from minifi_test_framework.core.hooks import common_before_scenario +from minifi_test_framework.core.hooks import common_after_scenario -class GetCouchbaseKey(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(GetCouchbaseKey, self).__init__( - context=context, - clazz='GetCouchbaseKey', - auto_terminate=['success', 'failure', 'retry'], - schedule=schedule) +def before_all(context): + dockerfile = """ + FROM python:3.13-slim-bookworm + RUN pip install couchbase==4.3.5 requests""" + builder = DockerImageBuilder( + image_tag="minifi-couchbase-helper:latest", + dockerfile_content=dockerfile + ) + builder.build() + + +def before_scenario(context, scenario): + common_before_scenario(context, scenario) + + +def after_scenario(context, scenario): + common_after_scenario(context, scenario) diff --git a/extensions/couchbase/tests/features/steps/couchbase_server_container.py b/extensions/couchbase/tests/features/steps/couchbase_server_container.py new file mode 100644 index 000000000..96cddf821 --- /dev/null +++ b/extensions/couchbase/tests/features/steps/couchbase_server_container.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from OpenSSL import crypto +from minifi_test_framework.core.helpers import wait_for_condition, retry_check +from minifi_test_framework.core.ssl_utils import make_server_cert +from minifi_test_framework.containers.container import Container +from minifi_test_framework.containers.file import File +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from docker.errors import ContainerError + + +class CouchbaseServerContainer(Container): + def __init__(self, test_context: MinifiTestContext): + super().__init__("couchbase:enterprise-7.2.5", f"couchbase-server-{test_context.scenario_id}", test_context.network) + + couchbase_cert, couchbase_key = make_server_cert(self.container_name, test_context.root_ca_cert, test_context.root_ca_key) + + root_ca_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert) + self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt", root_ca_content, permissions=0o666)) + couchbase_cert_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=couchbase_cert) + self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/chain.pem", couchbase_cert_content, permissions=0o666)) + couchbase_key_content = crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=couchbase_key) + self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/pkey.key", couchbase_key_content, permissions=0o666)) + + def deploy(self): + super().deploy() + finished_str = "logs available in" + assert wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=15, + bail_condition=lambda: self.exited, + context=None) + return self.run_post_startup_commands() + + def run_post_startup_commands(self): + commands = [ + ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", + "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], + ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", + "--bucket-ramsize", "1024", "--max-ttl", "36000"], + ["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123", + "--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"], + ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''], + ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json'] + ] + if not self._run_couchbase_cli_commands(commands): + return False + + if not self._load_couchbase_certs(): + return False + + return True + + @retry_check(max_tries=12, retry_interval=5) + def _run_couchbase_cli_command(self, command): + (code, output) = self.exec_run(command) + if code != 0: + logging.error(f"Failed to run command '{command}', returned error code: {code}, output: '{output}'") + return False + return True + + def _run_couchbase_cli_commands(self, commands): + return all(self._run_couchbase_cli_command(command) for command in commands) + + def _run_python_in_couchbase_helper_docker(self, command: str): + try: + self.client.containers.run("minifi-couchbase-helper:latest", ["python", "-c", command], remove=True, stdout=True, stderr=True, network=self.network.name) + return True + except ContainerError as e: + stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout else "" + stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr else "" + logging.error(f"Python command '{command}' failed in couchbase helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'") + return False + except Exception as e: + logging.error(f"Unexpected error while running python command '{command}' in couchbase helper docker: '{e}'") + return False + + @retry_check(max_tries=15, retry_interval=2) + def _load_couchbase_certs(self): + python_command = f""" +import requests +import sys +from requests.auth import HTTPBasicAuth +response = requests.post(f"http://{self.container_name}:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) +if response.status_code != 200: + sys.exit(1) + +response = requests.post(f"http://{self.container_name}:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123")) +if response.status_code != 200: + sys.exit(1) +sys.exit(0) + """ + return self._run_python_in_couchbase_helper_docker(python_command) + + def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): + python_command = f""" +from couchbase.cluster import Cluster +from couchbase.options import ClusterOptions +from couchbase.auth import PasswordAuthenticator +from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder +import json +import sys + +try: + cluster = Cluster("couchbase://{self.container_name}", ClusterOptions( + PasswordAuthenticator('Administrator', 'password123'))) + + bucket = cluster.bucket("{bucket_name}") + collection = bucket.default_collection() + + if "{expected_data_type}".lower() == "binary": + binary_flag = 0x03 << 24 + result = collection.get("{doc_id}", transcoder=RawBinaryTranscoder()) + flags = result.flags + if not flags & binary_flag: + print("Expected binary data for document '{doc_id}' but no binary flags were found.") + sys.exit(1) + + content = result.content_as[bytes] + if content.decode('utf-8') == '{expected_data}': + sys.exit(0) + + if "{expected_data_type}".lower() == "json": + json_flag = 0x02 << 24 + result = collection.get("{doc_id}") + flags = result.flags + if not flags & json_flag: + print("Expected JSON data for document '{doc_id}' but no JSON flags were found.") + sys.exit(1) + + content = result.content_as[dict] + if content == json.loads('{expected_data}'): + sys.exit(0) + if "{expected_data_type}".lower() == "string": + string_flag = 0x04 << 24 + result = collection.get("{doc_id}", transcoder=RawStringTranscoder()) + flags = result.flags + if not flags & string_flag: + print("Expected string data for document '{doc_id}' but no string flags were found.") + sys.exit(1) + + content = result.content_as[str] + if content == '{expected_data}': + sys.exit(0) + + sys.exit(1) +except Exception as e: + sys.exit(1) + """ + return self._run_python_in_couchbase_helper_docker(python_command) diff --git a/extensions/couchbase/tests/features/steps/steps.py b/extensions/couchbase/tests/features/steps/steps.py new file mode 100644 index 000000000..f20cfda2e --- /dev/null +++ b/extensions/couchbase/tests/features/steps/steps.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from behave import step, then + +from minifi_test_framework.steps import checking_steps # noqa: F401 +from minifi_test_framework.steps import configuration_steps # noqa: F401 +from minifi_test_framework.steps import core_steps # noqa: F401 +from minifi_test_framework.steps import flow_building_steps # noqa: F401 +from minifi_test_framework.steps.flow_building_steps import add_ssl_context_service_for_minifi +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.core.helpers import log_due_to_failure +from minifi_test_framework.minifi.controller_service import ControllerService +from couchbase_server_container import CouchbaseServerContainer + + +@step("a Couchbase server is started") +def step_impl(context: MinifiTestContext): + context.containers["couchbase-server"] = CouchbaseServerContainer(context) + assert context.containers["couchbase-server"].deploy() + + +@step("a CouchbaseClusterService controller service is set up to communicate with the Couchbase server") +def step_impl(context: MinifiTestContext): + controller_service = ControllerService(class_name="CouchbaseClusterService", service_name="CouchbaseClusterService") + controller_service.add_property("Connection String", f"couchbase://couchbase-server-{context.scenario_id}") + controller_service.add_property("User Name", "Administrator") + controller_service.add_property("User Password", "password123") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) + + +@step("a CouchbaseClusterService is set up using SSL connection") +def step_impl(context): + ssl_context_service = ControllerService(class_name="SSLContextService", service_name="SSLContextService") + ssl_context_service.add_property("CA Certificate", "/tmp/resources/root_ca.crt") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(ssl_context_service) + couchbase_cluster_service = ControllerService(class_name="CouchbaseClusterService", service_name="CouchbaseClusterService") + couchbase_cluster_service.add_property("Connection String", f"couchbases://couchbase-server-{context.scenario_id}") + couchbase_cluster_service.add_property("User Name", "Administrator") + couchbase_cluster_service.add_property("User Password", "password123") + couchbase_cluster_service.add_property("Linked Services", "SSLContextService") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(couchbase_cluster_service) + + +@step("a CouchbaseClusterService is setup up using mTLS authentication") +def step_impl(context: MinifiTestContext): + add_ssl_context_service_for_minifi(context, "clientuser") + controller_service = ControllerService(class_name="CouchbaseClusterService", service_name="CouchbaseClusterService") + controller_service.add_property("Connection String", f"couchbases://couchbase-server-{context.scenario_id}") + controller_service.add_property("Linked Services", "SSLContextService") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) + + +@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase") +def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str): + assert context.containers["couchbase-server"].is_data_present_in_couchbase(doc_id, bucket_name, data, data_type) or log_due_to_failure(context)
