This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9b84de3b8e56db67cd775b9a7d089a5ebd522343 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Jan 7 17:34:33 2026 +0100 MINIFICPP-2668 Move standard processor tests to modular docker tests Closes #2061 Signed-off-by: Ferenc Gerlits <[email protected]> --- .../minifi_test_framework/containers/container.py | 72 +++++++++++++++++----- .../containers/minifi_container.py | 3 + .../minifi_test_framework/steps/checking_steps.py | 21 ++++++- .../src/minifi_test_framework/steps/core_steps.py | 17 +++++ docker/test/integration/cluster/ContainerStore.py | 18 ------ docker/test/integration/cluster/ImageStore.py | 5 -- .../integration/cluster/containers/DiagSlave.py | 36 ----------- .../cluster/containers/TcpClientContainer.py | 39 ------------ docker/test/integration/features/steps/steps.py | 12 ---- .../minifi/processors/DefragmentText.py | 26 -------- .../minifi/processors/EvaluateJsonPath.py | 22 ------- .../minifi/processors/FetchModbusTcp.py | 26 -------- .../integration/minifi/processors/ListenTCP.py | 27 -------- .../integration/minifi/processors/SplitJson.py | 26 -------- .../integration/resources/diagslave/Dockerfile | 5 -- .../tests/features/attributes_to_json.feature | 2 +- .../tests}/features/defragtextflowfiles.feature | 25 ++++---- .../tests/features/environment.py | 9 ++- .../tests}/features/evaluate_json_path.feature | 17 +++-- .../tests}/features/fetch_modbus_tcp.feature | 12 ++-- .../tests}/features/file_system_operations.feature | 38 +++++++----- .../tests}/features/hashcontent.feature | 23 ++++--- .../tests}/features/network_listener.feature | 9 ++- .../tests}/features/split_json.feature | 24 ++++---- .../tests/features/steps/diag_slave_container.py | 56 +++++++++++++++++ .../tests/features/steps/steps.py | 18 ++++++ .../tests/features/steps/tcp_client_container.py | 40 ++++++++++++ 27 files changed, 303 insertions(+), 325 deletions(-) diff --git a/behave_framework/src/minifi_test_framework/containers/container.py b/behave_framework/src/minifi_test_framework/containers/container.py index e01c248d3..07abded28 100644 --- a/behave_framework/src/minifi_test_framework/containers/container.py +++ b/behave_framework/src/minifi_test_framework/containers/container.py @@ -134,7 +134,7 @@ class Container: return code, output.decode("utf-8") return None, "Container not running." - def not_empty_dir_exists(self, directory_path: str) -> bool: + def nonempty_dir_exists(self, directory_path: str) -> bool: if not self.container: return False dir_exists_exit_code, dir_exists_output = self.exec_run( @@ -145,8 +145,18 @@ class Container: "sh -c {}".format(shlex.quote(f'[ "$(ls -A {directory_path})" ]'))) return dir_not_empty_ec == 0 + def directory_contains_empty_file(self, directory_path: str) -> bool: + if not self.container or not self.nonempty_dir_exists(directory_path): + return False + + command = "sh -c {}".format(shlex.quote(f"find {directory_path} -maxdepth 1 -type f -empty")) + + exit_code, _ = self.exec_run(command) + + return exit_code == 0 + def directory_contains_file_with_content(self, directory_path: str, expected_content: str) -> bool: - if not self.container or not self.not_empty_dir_exists(directory_path): + if not self.container or not self.nonempty_dir_exists(directory_path): return False quoted_content = shlex.quote(expected_content) @@ -157,7 +167,7 @@ class Container: return exit_code == 0 def directory_contains_file_with_regex(self, directory_path: str, regex_str: str) -> bool: - if not self.container or not self.not_empty_dir_exists(directory_path): + if not self.container or not self.nonempty_dir_exists(directory_path): return False safe_dir_path = shlex.quote(directory_path) @@ -187,7 +197,7 @@ class Container: return file_count == 1 def directory_has_single_file_with_content(self, directory_path: str, expected_content: str) -> bool: - if not self.container or not self.not_empty_dir_exists(directory_path): + if not self.container or not self.nonempty_dir_exists(directory_path): return False count_command = f"sh -c 'find {directory_path} -maxdepth 1 -type f | wc -l'" @@ -243,7 +253,7 @@ class Container: logging.warning("Container not running") return -1 - if not self.not_empty_dir_exists(directory_path): + if not self.nonempty_dir_exists(directory_path): logging.warning(f"Container directory does not exist: {directory_path}") return 0 @@ -262,10 +272,7 @@ class Container: logging.error(f"Error parsing output '{output}' from command '{count_command}'") return -1 - def _verify_file_contents_in_running_container(self, directory_path: str, expected_contents: list[str]) -> bool: - if not self.not_empty_dir_exists(directory_path): - return False - + def _get_contents_of_all_files_in_directory(self, directory_path: str) -> list[str] | None: safe_dir_path = shlex.quote(directory_path) list_files_command = f"find {safe_dir_path} -mindepth 1 -maxdepth 1 -type f -print0" @@ -273,14 +280,10 @@ class Container: if exit_code != 0: logging.error(f"Error running command '{list_files_command}': {output}") - return False + return None actual_filepaths = [path for path in output.split('\0') if path] - if len(actual_filepaths) != len(expected_contents): - logging.debug(f"Expected {len(expected_contents)} files, but found {len(actual_filepaths)}") - return False - actual_file_contents = [] for path in actual_filepaths: safe_path = shlex.quote(path) @@ -291,10 +294,24 @@ class Container: if exit_code != 0: error_message = f"Command to read file '{path}' failed with exit code {exit_code}" logging.error(error_message) - return False + return None actual_file_contents.append(content) + return actual_file_contents + + def _verify_file_contents_in_running_container(self, directory_path: str, expected_contents: list[str]) -> bool: + if not self.nonempty_dir_exists(directory_path): + return False + + actual_file_contents = self._get_contents_of_all_files_in_directory(directory_path) + if actual_file_contents is None: + return False + + if len(actual_file_contents) != len(expected_contents): + logging.debug(f"Expected {len(expected_contents)} files, but found {len(actual_file_contents)}") + return False + return sorted(actual_file_contents) == sorted(expected_contents) def _verify_file_contents_in_stopped_container(self, directory_path: str, expected_contents: list[str]) -> bool: @@ -358,7 +375,8 @@ class Container: return False def verify_path_with_json_content(self, directory_path: str, expected_str: str) -> bool: - if not self.container or not self.not_empty_dir_exists(directory_path): + if not self.container or not self.nonempty_dir_exists(directory_path): + logging.warning(f"Container not running or directory does not exist: {directory_path}") return False count_command = f"sh -c 'find {directory_path} -maxdepth 1 -type f | wc -l'" @@ -390,3 +408,25 @@ class Container: expected_json = json.loads(expected_str) return actual_json == expected_json + + def directory_contains_file_with_json_content(self, directory_path: str, expected_content: str) -> bool: + if not self.container or not self.nonempty_dir_exists(directory_path): + logging.warning(f"Container not running or directory does not exist: {directory_path}") + return False + + actual_file_contents = self._get_contents_of_all_files_in_directory(directory_path) + if actual_file_contents is None: + return False + + for file_content in actual_file_contents: + try: + actual_json = json.loads(file_content) + expected_json = json.loads(expected_content) + if actual_json == expected_json: + return True + logging.warning(f"File content does not match expected JSON: {file_content}") + except json.JSONDecodeError: + logging.error("Error decoding JSON content from file.") + continue + + return False diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py index 84124ed8c..8678bdc0f 100644 --- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py @@ -66,6 +66,9 @@ class MinifiContainer(Container): def set_log_property(self, key: str, value: str): self.log_properties[key] = value + def enable_openssl_fips_mode(self): + self.properties["nifi.openssl.fips.support.enable"] = "true" + def _fill_default_properties(self): if self.is_fhs: self.properties["nifi.flow.configuration.file"] = "/etc/nifi-minifi-cpp/config.yml" diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py index f0376e5ee..4ae80cfa6 100644 --- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py @@ -155,8 +155,8 @@ def step_impl(context: MinifiTestContext, directory: str, timeout: str, contents context=context) -@then("a flowfile with the JSON content \"{content}\" is placed in {directory} in less than {duration}") -@then("a flowfile with the JSON content '{content}' is placed in {directory} in less than {duration}") +@then("a file with the JSON content \"{content}\" is placed in the \"{directory}\" directory in less than {duration}") +@then("a file with the JSON content '{content}' is placed in the '{directory}' directory in less than {duration}") def step_impl(context: MinifiTestContext, content: str, directory: str, duration: str): timeout_in_seconds = humanfriendly.parse_timespan(duration) assert wait_for_condition( @@ -174,6 +174,15 @@ def step_impl(context: MinifiTestContext, max_increase: str, duration: str): assert final_memory_usage - initial_memory_usage <= max_increase_in_bytes +@then("at least one file with the JSON content \"{content}\" is placed in the \"{directory}\" directory in less than {duration}") +@then("at least one file with the JSON content '{content}' is placed in the '{directory}' directory in less than {duration}") +def step_impl(context: MinifiTestContext, content: str, directory: str, duration: str): + timeout_in_seconds = humanfriendly.parse_timespan(duration) + assert wait_for_condition( + condition=lambda: context.get_default_minifi_container().directory_contains_file_with_json_content(directory, content), + timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context) + + @then('after a wait of {duration}, at least {lower_bound:d} and at most {upper_bound:d} files are produced and placed in the "{directory}" directory') def step_impl(context, lower_bound, upper_bound, duration, directory): duration_seconds = humanfriendly.parse_timespan(duration) @@ -197,3 +206,11 @@ def step_impl(context, directory, duration, contents): @then('exactly these files are in the "{directory}" directory in less than {duration}: ""') def step_impl(context, directory, duration): context.execute_steps(f'then no files are placed in the "{directory}" directory in {duration} of running time') + + +@then("at least one empty file is placed in the \"{directory}\" directory in less than {duration}") +def step_impl(context, directory, duration): + timeout_in_seconds = humanfriendly.parse_timespan(duration) + assert wait_for_condition( + condition=lambda: context.get_default_minifi_container().directory_contains_empty_file(directory), + timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context) diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py index c2369f5ad..eb4050ab6 100644 --- a/behave_framework/src/minifi_test_framework/steps/core_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py @@ -20,6 +20,7 @@ import random import string import os import time +import uuid import humanfriendly from behave import when, step, given @@ -58,6 +59,17 @@ def step_impl(context: MinifiTestContext, file_name: str, content: str, path: st context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, file_name), new_content)) +@step('a file with the content "{content}" is present in "{path}"') +def step_impl(context: MinifiTestContext, content: str, path: str): + new_content = content.replace("\\n", "\n") + context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, str(uuid.uuid4())), new_content)) + + +@given("an empty file is present in \"{path}\"") +def step_impl(context, path): + context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, str(uuid.uuid4())), "")) + + @given('a host resource file "{filename}" is bound to the "{container_path}" path in the MiNiFi container "{container_name}"') def step_impl(context: MinifiTestContext, filename: str, container_path: str, container_name: str): path = os.path.join(context.resource_dir, filename) @@ -83,3 +95,8 @@ def step_impl(context: MinifiTestContext): @when("MiNiFi is restarted") def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().restart() + + +@given("OpenSSL FIPS mode is enabled in MiNiFi") +def step_impl(context): + context.get_or_create_default_minifi_container().enable_openssl_fips_mode() diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index d47f59ab1..31e1be1b5 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -31,13 +31,11 @@ from .containers.OpensearchContainer import OpensearchContainer from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer from .containers.MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster -from .containers.TcpClientContainer import TcpClientContainer from .containers.PrometheusContainer import PrometheusContainer from .containers.MinifiC2ServerContainer import MinifiC2ServerContainer from .containers.GrafanaLokiContainer import GrafanaLokiContainer from .containers.GrafanaLokiContainer import GrafanaLokiOptions from .containers.ReverseProxyContainer import ReverseProxyContainer -from .containers.DiagSlave import DiagSlave from .containers.CouchbaseServerContainer import CouchbaseServerContainer from .FeatureContext import FeatureContext @@ -216,14 +214,6 @@ class ContainerStore: network=self.network, image_store=self.image_store, command=command)) - elif engine == "tcp-client": - return self.containers.setdefault(container_name, - TcpClientContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) elif engine == "prometheus": return self.containers.setdefault(container_name, PrometheusContainer(feature_context=feature_context, @@ -276,14 +266,6 @@ class ContainerStore: network=self.network, image_store=self.image_store, command=command)) - elif engine == "diag-slave-tcp": - return self.containers.setdefault(container_name, - DiagSlave(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) elif engine == "couchbase-server": return self.containers.setdefault(container_name, CouchbaseServerContainer(feature_context=feature_context, diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 06793c26c..88d06cff9 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -71,8 +71,6 @@ class ImageStore: image = self.__build_kinesis_image() elif container_engine == "reverse-proxy": image = self.__build_reverse_proxy_image() - elif container_engine == "diag-slave-tcp": - image = self.__build_diagslave_image() else: raise Exception("There is no associated image for " + container_engine) @@ -303,9 +301,6 @@ class ImageStore: def __build_reverse_proxy_image(self): return self.__build_image_by_path(self.test_dir + "/resources/reverse-proxy", 'reverse-proxy') - def __build_diagslave_image(self): - return self.__build_image_by_path(self.test_dir + "/resources/diagslave", 'diag-slave-tcp') - def __build_image(self, dockerfile, context_files=[]): conf_dockerfile_buffer = BytesIO() docker_context_buffer = BytesIO() diff --git a/docker/test/integration/cluster/containers/DiagSlave.py b/docker/test/integration/cluster/containers/DiagSlave.py deleted file mode 100644 index 25c70f7b2..000000000 --- a/docker/test/integration/cluster/containers/DiagSlave.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -from .Container import Container - - -class DiagSlave(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'diag-slave-tcp', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "Server started up successfully." - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running a DiagSlave docker container...') - self.client.containers.run( - self.image_store.get_image(self.get_engine()), - detach=True, - name=self.name, - network=self.network.name) - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/cluster/containers/TcpClientContainer.py b/docker/test/integration/cluster/containers/TcpClientContainer.py deleted file mode 100644 index dc27e53e4..000000000 --- a/docker/test/integration/cluster/containers/TcpClientContainer.py +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -from .Container import Container - - -class TcpClientContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'tcp-client', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "TCP client container started" - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running a tcp client docker container...') - self.client.containers.run( - "alpine:3.17.3", - detach=True, - name=self.name, - network=self.network.name, - entrypoint='/bin/sh', - command="-c 'apk add netcat-openbsd && echo TCP client container started; while true; do echo " - f"test_tcp_message | nc minifi-cpp-flow-{self.feature_context.id} 10254; sleep 1; done'") - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index fbf62251b..9483b0580 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1297,18 +1297,6 @@ def step_impl(context, parameter_context_name): container.set_parameter_context_name(parameter_context_name) -# Modbus -@given(u'there is an accessible PLC with modbus enabled') -def step_impl(context): - context.test.acquire_container(context=context, name="diag-slave-tcp", engine="diag-slave-tcp") - context.test.start('diag-slave-tcp') - - -@given(u'PLC register has been set with {modbus_cmd} command') -def step_impl(context, modbus_cmd): - context.test.set_value_on_plc_with_modbus(context.test.get_container_name_with_postfix('diag-slave-tcp'), modbus_cmd) - - # Couchbase @when(u'a Couchbase server is started') def step_impl(context): diff --git a/docker/test/integration/minifi/processors/DefragmentText.py b/docker/test/integration/minifi/processors/DefragmentText.py deleted file mode 100644 index c00750fe9..000000000 --- a/docker/test/integration/minifi/processors/DefragmentText.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class DefragmentText(Processor): - def __init__(self, context, delimiter="<[0-9]+>", schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(DefragmentText, self).__init__(context=context, - clazz='DefragmentText', - schedule=schedule, - properties={'Delimiter': delimiter}, - auto_terminate=['success', 'failure']) diff --git a/docker/test/integration/minifi/processors/EvaluateJsonPath.py b/docker/test/integration/minifi/processors/EvaluateJsonPath.py deleted file mode 100644 index 7874963e5..000000000 --- a/docker/test/integration/minifi/processors/EvaluateJsonPath.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class EvaluateJsonPath(Processor): - def __init__(self, context): - super(EvaluateJsonPath, self).__init__(context=context, clazz='EvaluateJsonPath') diff --git a/docker/test/integration/minifi/processors/FetchModbusTcp.py b/docker/test/integration/minifi/processors/FetchModbusTcp.py deleted file mode 100644 index 0141817cf..000000000 --- a/docker/test/integration/minifi/processors/FetchModbusTcp.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class FetchModbusTcp(Processor): - def __init__(self, context): - super(FetchModbusTcp, self).__init__( - context=context, - clazz='FetchModbusTcp', - properties={ - 'Hostname': f'diag-slave-tcp-{context.feature_id}', - }, - auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/minifi/processors/ListenTCP.py b/docker/test/integration/minifi/processors/ListenTCP.py deleted file mode 100644 index f2f201f58..000000000 --- a/docker/test/integration/minifi/processors/ListenTCP.py +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class ListenTCP(Processor): - def __init__(self, context, schedule=None): - properties = {} - - super(ListenTCP, self).__init__( - context=context, - clazz='ListenTCP', - properties=properties, - auto_terminate=['success'], - schedule=schedule) diff --git a/docker/test/integration/minifi/processors/SplitJson.py b/docker/test/integration/minifi/processors/SplitJson.py deleted file mode 100644 index d1b6087f7..000000000 --- a/docker/test/integration/minifi/processors/SplitJson.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class SplitJson(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(SplitJson, self).__init__( - context=context, - clazz='SplitJson', - schedule=schedule, - auto_terminate=['original', "split", "failure"]) diff --git a/docker/test/integration/resources/diagslave/Dockerfile b/docker/test/integration/resources/diagslave/Dockerfile deleted file mode 100644 index 4b7d29ba2..000000000 --- a/docker/test/integration/resources/diagslave/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM panterdsd/diagslave:latest -RUN pip install modbus-cli - -ENV PROTOCOL=tcp - diff --git a/extensions/standard-processors/tests/features/attributes_to_json.feature b/extensions/standard-processors/tests/features/attributes_to_json.feature index 652433410..337847ebb 100644 --- a/extensions/standard-processors/tests/features/attributes_to_json.feature +++ b/extensions/standard-processors/tests/features/attributes_to_json.feature @@ -27,4 +27,4 @@ Feature: Writing attribute data using AttributesToJSON processor And the "success" relationship of the AttributesToJSON processor is connected to the PutFile And PutFile's success relationship is auto-terminated When the MiNiFi instance starts up - Then a flowfile with the JSON content "{"invalid":null,"filename":"test_file.log"}" is placed in "/tmp/output" in less than 20 seconds + Then a file with the JSON content "{"invalid":null,"filename":"test_file.log"}" is placed in the "/tmp/output" directory in less than 20 seconds diff --git a/docker/test/integration/features/defragtextflowfiles.feature b/extensions/standard-processors/tests/features/defragtextflowfiles.feature similarity index 88% rename from docker/test/integration/features/defragtextflowfiles.feature rename to extensions/standard-processors/tests/features/defragtextflowfiles.feature index 44eb0cf60..d61e2e97f 100644 --- a/docker/test/integration/features/defragtextflowfiles.feature +++ b/extensions/standard-processors/tests/features/defragtextflowfiles.feature @@ -15,8 +15,6 @@ @CORE Feature: DefragmentText can defragment fragmented data from TailFile - Background: - Given the content of "/tmp/output" is monitored Scenario Outline: DefragmentText correctly merges split messages from TailFile multiple file tail-mode Given a TailFile processor with the name "MultiTail" and the "File to Tail" property set to "test_file_.*\.log" @@ -28,18 +26,20 @@ Feature: DefragmentText can defragment fragmented data from TailFile And a file with filename "test_file_two.log" and content "<input_two>" is present in "/tmp/input" And a DefragmentText processor with the "Pattern" property set to "<pattern>" And the "Pattern Location" property of the DefragmentText processor is set to "<pattern location>" + And DefragmentText is EVENT_DRIVEN And a PutFile processor with the name "SuccessPut" and the "Directory" property set to "/tmp/output" + And SuccessPut is EVENT_DRIVEN And the "success" relationship of the MultiTail processor is connected to the DefragmentText And the "success" relationship of the DefragmentText processor is connected to the SuccessPut - + And SuccessPut's success relationship is auto-terminated When all instances start up - Then flowfiles with these contents are placed in the monitored directory in less than 60 seconds: "<success_flow_files>" + Then files with contents "<success_flow_files>" are placed in the "/tmp/output" directory in less than 60 seconds Examples: | input_one | input_two | pattern | pattern location | success_flow_files | | <1>cat%dog%mouse%<2>apple%banana%<3>English% | <1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch% | <[0-9]+> | Start of Message | <1>cat%dog%mouse%,<1>Katze%Hund%Maus%,<2>apple%banana%,<2>Apfel%Banane% | - | <1>cat%dog%mouse%<2>apple%banana%<3>English% | <1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch% | <[0-9]+> | End of Message | <1>,cat%dog%mouse%<2>,Katze%Hund%Maus%<2>,apple%banana%<3>,Apfel%Banane%<3> | + | <1>cat%dog%mouse%<2>apple%banana%<3>English% | <1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch% | <[0-9]+> | End of Message | <1>,<1>,cat%dog%mouse%<2>,Katze%Hund%Maus%<2>,apple%banana%<3>,Apfel%Banane%<3> | Scenario Outline: DefragmentText correctly merges split messages from multiple TailFile Given a TailFile processor with the name "TailOne" and the "File to Tail" property set to "/tmp/input/test_file_one.log" @@ -48,24 +48,25 @@ Feature: DefragmentText can defragment fragmented data from TailFile And a TailFile processor with the name "TailTwo" and the "File to Tail" property set to "/tmp/input/test_file_two.log" And the "Initial Start Position" property of the TailTwo processor is set to "Beginning of File" And the "Input Delimiter" property of the TailTwo processor is set to "%" - And "TailTwo" processor is a start node And a file with filename "test_file_one.log" and content "<input_one>" is present in "/tmp/input" And a file with filename "test_file_two.log" and content "<input_two>" is present in "/tmp/input" And a DefragmentText processor with the "Pattern" property set to "<pattern>" And the "Pattern Location" property of the DefragmentText processor is set to "<pattern location>" + And DefragmentText is EVENT_DRIVEN And a PutFile processor with the name "SuccessPut" and the "Directory" property set to "/tmp/output" + And SuccessPut is EVENT_DRIVEN And the "success" relationship of the TailOne processor is connected to the DefragmentText And the "success" relationship of the TailTwo processor is connected to the DefragmentText And the "success" relationship of the DefragmentText processor is connected to the SuccessPut - + And SuccessPut's success relationship is auto-terminated When all instances start up - Then flowfiles with these contents are placed in the monitored directory in less than 60 seconds: "<success_flow_files>" + Then files with contents "<success_flow_files>" are placed in the "/tmp/output" directory in less than 60 seconds Examples: | input_one | input_two | pattern | pattern location | success_flow_files | | <1>cat%dog%mouse%<2>apple%banana%<3>English% | <1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch% | <[0-9]+> | Start of Message | <1>cat%dog%mouse%,<1>Katze%Hund%Maus%,<2>apple%banana%,<2>Apfel%Banane% | - | <1>cat%dog%mouse%<2>apple%banana%<3>English% | <1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch% | <[0-9]+> | End of Message | <1>,cat%dog%mouse%<2>,Katze%Hund%Maus%<2>,apple%banana%<3>,Apfel%Banane%<3> | + | <1>cat%dog%mouse%<2>apple%banana%<3>English% | <1>Katze%Hund%Maus%<2>Apfel%Banane%<3>Deutsch% | <[0-9]+> | End of Message | <1>,<1>,cat%dog%mouse%<2>,Katze%Hund%Maus%<2>,apple%banana%<3>,Apfel%Banane%<3> | Scenario Outline: DefragmentText merges split messages from a single TailFile Given a TailFile processor with the "File to Tail" property set to "/tmp/input/test_file.log" @@ -74,13 +75,15 @@ Feature: DefragmentText can defragment fragmented data from TailFile And a file with filename "test_file.log" and content "<input>" is present in "/tmp/input" And a DefragmentText processor with the "Pattern" property set to "<pattern>" And the "Pattern Location" property of the DefragmentText processor is set to "<pattern location>" + And DefragmentText is EVENT_DRIVEN And a PutFile processor with the name "SuccessPut" and the "Directory" property set to "/tmp/output" + And SuccessPut is EVENT_DRIVEN And the "success" relationship of the TailFile processor is connected to the DefragmentText And the "success" relationship of the DefragmentText processor is connected to the SuccessPut - + And SuccessPut's success relationship is auto-terminated When all instances start up - Then flowfiles with these contents are placed in the monitored directory in less than 30 seconds: "<success_flow_files>" + Then files with contents "<success_flow_files>" are placed in the "/tmp/output" directory in less than 30 seconds Examples: | input | pattern | pattern location | success_flow_files | diff --git a/extensions/standard-processors/tests/features/environment.py b/extensions/standard-processors/tests/features/environment.py index 225347536..e2908d510 100644 --- a/extensions/standard-processors/tests/features/environment.py +++ b/extensions/standard-processors/tests/features/environment.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import platform from minifi_test_framework.core.hooks import common_before_scenario from minifi_test_framework.core.hooks import common_after_scenario @@ -21,6 +21,13 @@ from minifi_test_framework.core.hooks import common_after_scenario # The common_before_scenario and common_after_scenario must be called for proper setup and tear down +def before_feature(context, feature): + if "x86_x64_only" in feature.tags: + is_x86 = platform.machine() in ("i386", "AMD64", "x86_64") + if not is_x86: + feature.skip("This feature is only x86/x64 compatible") + + def before_scenario(context, scenario): common_before_scenario(context, scenario) diff --git a/docker/test/integration/features/evaluate_json_path.feature b/extensions/standard-processors/tests/features/evaluate_json_path.feature similarity index 83% rename from docker/test/integration/features/evaluate_json_path.feature rename to extensions/standard-processors/tests/features/evaluate_json_path.feature index 71d36647c..d2c503363 100644 --- a/docker/test/integration/features/evaluate_json_path.feature +++ b/extensions/standard-processors/tests/features/evaluate_json_path.feature @@ -15,8 +15,6 @@ @CORE Feature: Writing JSON path query result to attribute or flow file using EvaluateJsonPath processor - Background: - Given the content of "/tmp/output" is monitored Scenario: Write query result to flow file Given a GetFile processor with the "Input Directory" property set to "/tmp/input" @@ -24,10 +22,13 @@ Feature: Writing JSON path query result to attribute or flow file using Evaluate And a EvaluateJsonPath processor with the "Destination" property set to "flowfile-content" And the "JsonPath" property of the EvaluateJsonPath processor is set to "$.books[*].title" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the EvaluateJsonPath And the "matched" relationship of the EvaluateJsonPath processor is connected to the PutFile + And PutFile's success relationship is auto-terminated + And PutFile's failure relationship is auto-terminated When the MiNiFi instance starts up - Then a flowfile with the JSON content "["The Great Gatsby","1984"]" is placed in the monitored directory in less than 10 seconds + Then a file with the JSON content "["The Great Gatsby","1984"]" is placed in the "/tmp/output" directory in less than 10 seconds Scenario: Write query result to attributes Given a GetFile processor with the "Input Directory" property set to "/tmp/input" @@ -39,12 +40,16 @@ Feature: Writing JSON path query result to attribute or flow file using Evaluate And the "author" property of the EvaluateJsonPath processor is set to "$.author" And the "release" property of the EvaluateJsonPath processor is set to "$.release" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor + And LogAttribute is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the EvaluateJsonPath And the "matched" relationship of the EvaluateJsonPath processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And PutFile's failure relationship is auto-terminated + And LogAttribute's success relationship is auto-terminated When the MiNiFi instance starts up - Then a flowfile with the JSON content "{"title": "1984", "author": null}" is placed in the monitored directory in less than 10 seconds + Then a file with the JSON content "{"title": "1984", "author": null}" is placed in the "/tmp/output" directory in less than 10 seconds And the Minifi logs contain the following message: "key:title value:1984" in less than 10 seconds - And the Minifi logs contain the following message: "key:author value:null" in less than 0 seconds - And the Minifi logs do not contain the following message: "key:release" after 0 seconds + And the Minifi logs contain the following message: "key:author value:null" in less than 1 seconds + And the Minifi logs do not contain the following message: "key:release" after 10 seconds diff --git a/docker/test/integration/features/fetch_modbus_tcp.feature b/extensions/standard-processors/tests/features/fetch_modbus_tcp.feature similarity index 79% rename from docker/test/integration/features/fetch_modbus_tcp.feature rename to extensions/standard-processors/tests/features/fetch_modbus_tcp.feature index d160a01f2..779fd7bd8 100644 --- a/docker/test/integration/features/fetch_modbus_tcp.feature +++ b/extensions/standard-processors/tests/features/fetch_modbus_tcp.feature @@ -17,15 +17,14 @@ @CORE Feature: Minifi C++ can act as a modbus tcp master - Background: - Given the content of "/tmp/output" is monitored - Scenario: MiNiFi can fetch data from a modbus slave Given a FetchModbusTcp processor - And a JsonRecordSetWriter controller service is set up with "One Line Per Object" output grouping + And a JsonRecordSetWriter controller service is set up and the "Output Grouping" property set to "One Line Per Object" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "Unit Identifier" property of the FetchModbusTcp processor is set to "255" And the "Record Set Writer" property of the FetchModbusTcp processor is set to "JsonRecordSetWriter" + And the "Hostname" property of the FetchModbusTcp processor is set to "diag-slave-tcp-${scenario_id}" And there is an accessible PLC with modbus enabled And PLC register has been set with h@52=123 command And PLC register has been set with h@5678/f=1.75 command @@ -40,6 +39,7 @@ Feature: Minifi C++ can act as a modbus tcp master And the "foo" property of the FetchModbusTcp processor is set to "holding-register:52" And the "bar" property of the FetchModbusTcp processor is set to "405678:REAL" And the "baz" property of the FetchModbusTcp processor is set to "4x4444:CHAR[6]" + And PutFile's success relationship is auto-terminated - When both instances start up - Then a flowfile with the JSON content "{"foo":123,"bar":1.75,"baz":["M", "i", "N", "i", "F", "i"]}" is placed in the monitored directory in less than 10 seconds + When the MiNiFi instance starts up + Then at least one file with the JSON content "{"foo":123,"bar":1.75,"baz":["M", "i", "N", "i", "F", "i"]}" is placed in the "/tmp/output" directory in less than 10 seconds diff --git a/docker/test/integration/features/file_system_operations.feature b/extensions/standard-processors/tests/features/file_system_operations.feature similarity index 67% rename from docker/test/integration/features/file_system_operations.feature rename to extensions/standard-processors/tests/features/file_system_operations.feature index 5979bca7d..d7341ddb0 100644 --- a/docker/test/integration/features/file_system_operations.feature +++ b/extensions/standard-processors/tests/features/file_system_operations.feature @@ -19,49 +19,57 @@ Feature: File system operations are handled by the GetFile, PutFile, ListFile an As a user of MiNiFi I need to have GetFile, PutFile, ListFile and FetchFile processors - Background: - Given the content of "/tmp/output" is monitored - Scenario: Get and put operations run in a simple flow Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "test" is present in "/tmp/input" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the PutFile + And PutFile's success relationship is auto-terminated When the MiNiFi instance starts up - Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 10 seconds Scenario: PutFile does not overwrite a file that already exists - Given a set of processors: - | type | name | uuid | - | GetFile | GetFile | 66259995-11da-41df-bff7-e262d5f6d7c9 | - | PutFile | PutFile_1 | 694423a0-26f3-4e95-9f9f-c03b6d6c189d | - | PutFile | PutFile_2 | f37e51e9-ad67-4e16-9dc6-ad853b0933e3 | - | PutFile | PutFile_3 | f37e51e9-ad67-4e16-9dc6-ad853b0933e4 | + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a PutFile processor with the name "PutFile_1" + And PutFile_1 is EVENT_DRIVEN + And a PutFile processor with the name "PutFile_2" + And PutFile_2 is EVENT_DRIVEN + And a PutFile processor with the name "PutFile_3" + And PutFile_3 is EVENT_DRIVEN - And these processor properties are set: + And these processor properties are set | processor name | property name | property value | | GetFile | Input Directory | /tmp/input | - | PutFile_1 | Input Directory | /tmp | + | PutFile_1 | Directory | /tmp | | PutFile_2 | Directory | /tmp | | PutFile_3 | Directory | /tmp/output | - And the processors are connected up as described here: + And the processors are connected up as described here | source name | relationship name | destination name | | GetFile | success | PutFile_1 | | PutFile_1 | success | PutFile_2 | | PutFile_2 | failure | PutFile_3 | + And PutFile_1's failure relationship is auto-terminated + And PutFile_2's success relationship is auto-terminated + And PutFile_3's success relationship is auto-terminated + And PutFile_3's failure relationship is auto-terminated + And a file with the content "test" is present in "/tmp/input" When the MiNiFi instance starts up - Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 10 seconds Scenario: List and fetch files from a directory in a simple flow Given a file with filename "test_file.log" and content "Test message" is present in "/tmp/input" And a file with filename "test_file2.log" and content "Another test message" is present in "/tmp/input" And a ListFile processor with the "Input Directory" property set to "/tmp/input" And a FetchFile processor + And FetchFile is EVENT_DRIVEN And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the ListFile processor is connected to the FetchFile And the "success" relationship of the FetchFile processor is connected to the PutFile + And PutFile's success relationship is auto-terminated When the MiNiFi instance starts up - Then two flowfiles with the contents "Test message" and "Another test message" are placed in the monitored directory in less than 30 seconds + Then files with contents "Test message" and "Another test message" are placed in the "/tmp/output" directory in less than 10 seconds diff --git a/docker/test/integration/features/hashcontent.feature b/extensions/standard-processors/tests/features/hashcontent.feature similarity index 83% rename from docker/test/integration/features/hashcontent.feature rename to extensions/standard-processors/tests/features/hashcontent.feature index 54cc607f7..8e24bf62b 100644 --- a/docker/test/integration/features/hashcontent.feature +++ b/extensions/standard-processors/tests/features/hashcontent.feature @@ -19,19 +19,19 @@ Feature: Hash value is added to Flowfiles by HashContent processor As a user of MiNiFi I need to have HashContent processor to calculate and add hash value - Background: - Given the content of "/tmp/output" is monitored - Scenario Outline: HashContent adds hash attribute to flowfiles Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content <content> is present in "/tmp/input" And a HashContent processor with the "Hash Attribute" property set to "hash" And the "Hash Algorithm" property of the HashContent processor is set to "<hash_algorithm>" + And HashContent is EVENT_DRIVEN And a LogAttribute processor + And LogAttribute is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the HashContent And the "success" relationship of the HashContent processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When the MiNiFi instance starts up - Then the Minifi logs contain the following message: "key:hash value:<hash_value>" in less than 60 seconds + Then the Minifi logs contain the following message: "key:hash value:<hash_value>" in less than 10 seconds Examples: | content | hash_algorithm | hash_value | @@ -39,27 +39,32 @@ Feature: Hash value is added to Flowfiles by HashContent processor | "test" | SHA1 | A94A8FE5CCB19BA61C4C0873D391E987982FBBD3 | | "coffee" | SHA256 | 37290D74AC4D186E3A8E5785D259D2EC04FAC91AE28092E7620EC8BC99E830AA | - Scenario: HashContent fails for an empty file if 'fail on empty' property is set to true Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And an empty file is present in "/tmp/input" And a HashContent processor with the "Hash Attribute" property set to "hash" And the "Hash Algorithm" property of the HashContent processor is set to "MD5" And the "Fail on empty" property of the HashContent processor is set to "true" + And HashContent is EVENT_DRIVEN And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the HashContent And the "failure" relationship of the HashContent processor is connected to the PutFile + And PutFile's success relationship is auto-terminated When the MiNiFi instance starts up - Then at least one empty flowfile is placed in the monitored directory in less than 10 seconds + Then at least one empty file is placed in the "/tmp/output" directory in less than 10 seconds - Scenario Outline: HashContent can use MD5 in FIPS mode + Scenario: HashContent can use MD5 in FIPS mode Given OpenSSL FIPS mode is enabled in MiNiFi And a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content apple is present in "/tmp/input" + And a file with the content "apple" is present in "/tmp/input" And a HashContent processor with the "Hash Attribute" property set to "hash" And the "Hash Algorithm" property of the HashContent processor is set to "MD5" + And HashContent is EVENT_DRIVEN And a LogAttribute processor + And LogAttribute is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the HashContent And the "success" relationship of the HashContent processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When the MiNiFi instance starts up - Then the Minifi logs contain the following message: "key:hash value:1F3870BE274F6C49B3E31A0C6728957F" in less than 60 seconds + Then the Minifi logs contain the following message: "key:hash value:1F3870BE274F6C49B3E31A0C6728957F" in less than 10 seconds diff --git a/docker/test/integration/features/network_listener.feature b/extensions/standard-processors/tests/features/network_listener.feature similarity index 83% rename from docker/test/integration/features/network_listener.feature rename to extensions/standard-processors/tests/features/network_listener.feature index fe659a669..4eb6f2bb1 100644 --- a/docker/test/integration/features/network_listener.feature +++ b/extensions/standard-processors/tests/features/network_listener.feature @@ -16,15 +16,14 @@ @CORE Feature: Minifi C++ can act as a network listener - Background: - Given the content of "/tmp/output" is monitored - Scenario: A TCP client can send messages to Minifi Given a ListenTCP processor And the "Listening Port" property of the ListenTCP processor is set to "10254" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a TCP client is set up to send a test TCP message to minifi And the "success" relationship of the ListenTCP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - When both instances start up - Then at least one flowfile with the content "test_tcp_message" is placed in the monitored directory in less than 60 seconds + When all instances start up + Then at least one file with the content "test_tcp_message" is placed in the "/tmp/output" directory in less than 20 seconds diff --git a/docker/test/integration/features/split_json.feature b/extensions/standard-processors/tests/features/split_json.feature similarity index 70% rename from docker/test/integration/features/split_json.feature rename to extensions/standard-processors/tests/features/split_json.feature index 607020b88..98ea1fd72 100644 --- a/docker/test/integration/features/split_json.feature +++ b/extensions/standard-processors/tests/features/split_json.feature @@ -15,28 +15,30 @@ @CORE Feature: Splitting JSON content using SplitJson processor - Background: - Given the content of "/tmp/output" is monitored Scenario: Split multiple query results to separate flow files Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with filename "test_file.json" and content "{"company": {"departments": [{"name": "Engineering", "employees": ["Alice", "Bob"]}, {"name": "Marketing", "employees": "Dave"}, {"name": "Sales", "employees": null}]}}" is present in "/tmp/input" And a SplitJson processor with the "JsonPath Expression" property set to "$.company.departments[*].employees" And the "Null Value Representation" property of the SplitJson processor is set to "the string 'null'" + And SplitJson is EVENT_DRIVEN And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And LogAttribute is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the SplitJson And the "split" relationship of the SplitJson processor is connected to the PutFile And the "original" relationship of the SplitJson processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When the MiNiFi instance starts up - Then at least one flowfile with the content "["Alice","Bob"]" is placed in the monitored directory in less than 10 seconds - And at least one flowfile with the content "Dave" is placed in the monitored directory in less than 0 seconds - And at least one flowfile with the content "null" is placed in the monitored directory in less than 0 seconds - And at least one flowfile with the content "{"company": {"departments": [{"name": "Engineering", "employees": ["Alice", "Bob"]}, {"name": "Marketing", "employees": "Dave"}, {"name": "Sales", "employees": null}]}}" is placed in the monitored directory in less than 0 seconds + Then at least one file with the content "["Alice","Bob"]" is placed in the "/tmp/output" directory in less than 10 seconds + And at least one file with the content "Dave" is placed in the "/tmp/output" directory in less than 1 seconds + And at least one file with the content "null" is placed in the "/tmp/output" directory in less than 1 seconds + And at least one file with the content "{"company": {"departments": [{"name": "Engineering", "employees": ["Alice", "Bob"]}, {"name": "Marketing", "employees": "Dave"}, {"name": "Sales", "employees": null}]}}" is placed in the "/tmp/output" directory in less than 1 seconds And the Minifi logs contain the following message: "key:fragment.count value:3" in less than 3 seconds - And the Minifi logs contain the following message: "key:fragment.index value:0" in less than 0 seconds - And the Minifi logs contain the following message: "key:fragment.index value:1" in less than 0 seconds - And the Minifi logs contain the following message: "key:fragment.index value:2" in less than 0 seconds - And the Minifi logs contain the following message: "key:fragment.identifier value:" in less than 0 seconds - And the Minifi logs contain the following message: "key:segment.original.filename value:" in less than 0 seconds + And the Minifi logs contain the following message: "key:fragment.index value:0" in less than 1 seconds + And the Minifi logs contain the following message: "key:fragment.index value:1" in less than 1 seconds + And the Minifi logs contain the following message: "key:fragment.index value:2" in less than 1 seconds + And the Minifi logs contain the following message: "key:fragment.identifier value:" in less than 1 seconds + And the Minifi logs contain the following message: "key:segment.original.filename value:" in less than 1 seconds diff --git a/extensions/standard-processors/tests/features/steps/diag_slave_container.py b/extensions/standard-processors/tests/features/steps/diag_slave_container.py new file mode 100644 index 000000000..a168c7325 --- /dev/null +++ b/extensions/standard-processors/tests/features/steps/diag_slave_container.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from textwrap import dedent + +from minifi_test_framework.containers.container import Container +from minifi_test_framework.containers.docker_image_builder import DockerImageBuilder +from minifi_test_framework.core.helpers import wait_for_condition +from minifi_test_framework.core.minifi_test_context import MinifiTestContext + + +class DiagSlave(Container): + def __init__(self, test_context: MinifiTestContext): + dockerfile = dedent("""\ +FROM panterdsd/diagslave:latest +RUN pip install modbus-cli +ENV PROTOCOL=tcp +""") + + builder = DockerImageBuilder( + image_tag="minifi-diag-slave-tcp:latest", + dockerfile_content=dockerfile + ) + builder.build() + + super().__init__("minifi-diag-slave-tcp:latest", f"diag-slave-tcp-{test_context.scenario_id}", test_context.network) + + def deploy(self): + super().deploy() + finished_str = "Server started up successfully." + return wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=5, + bail_condition=lambda: self.exited, + context=None + ) + + def set_value_on_plc_with_modbus(self, modbus_cmd): + (code, output) = self.exec_run(["modbus", "localhost", modbus_cmd]) + logging.info("Modbus command '%s' output: %s", modbus_cmd, output) + return code == 0 diff --git a/extensions/standard-processors/tests/features/steps/steps.py b/extensions/standard-processors/tests/features/steps/steps.py index a53631e2e..12a28eaa8 100644 --- a/extensions/standard-processors/tests/features/steps/steps.py +++ b/extensions/standard-processors/tests/features/steps/steps.py @@ -23,6 +23,8 @@ from minifi_test_framework.steps import core_steps # noqa: F401 from minifi_test_framework.steps import flow_building_steps # noqa: F401 from minifi_test_framework.core.minifi_test_context import MinifiTestContext from syslog_container import SyslogContainer +from diag_slave_container import DiagSlave +from tcp_client_container import TcpClientContainer @step("a Syslog client with TCP protocol is setup to send logs to minifi") @@ -33,3 +35,19 @@ def step_impl(context: MinifiTestContext): @step("a Syslog client with UDP protocol is setup to send logs to minifi") def step_impl(context): context.containers["syslog-udp"] = SyslogContainer("udp", context) + + +@step('there is an accessible PLC with modbus enabled') +def step_impl(context): + modbus_container = context.containers["diag-slave-tcp"] = DiagSlave(context) + assert modbus_container.deploy() + + +@step('PLC register has been set with {modbus_cmd} command') +def step_impl(context, modbus_cmd): + assert context.containers["diag-slave-tcp"].set_value_on_plc_with_modbus(modbus_cmd) or context.containers["diag-slave-tcp"].log_app_output() + + +@step('a TCP client is set up to send a test TCP message to minifi') +def step_impl(context): + context.containers["tcp-client"] = TcpClientContainer(context) diff --git a/extensions/standard-processors/tests/features/steps/tcp_client_container.py b/extensions/standard-processors/tests/features/steps/tcp_client_container.py new file mode 100644 index 000000000..d3f02d16a --- /dev/null +++ b/extensions/standard-processors/tests/features/steps/tcp_client_container.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from minifi_test_framework.containers.container import Container +from minifi_test_framework.core.helpers import wait_for_condition +from minifi_test_framework.core.minifi_test_context import MinifiTestContext + + +class TcpClientContainer(Container): + def __init__(self, test_context: MinifiTestContext): + cmd = ( + "/bin/sh -c 'apk add netcat-openbsd && " + "echo TCP client container started; " + "while true; do echo test_tcp_message | " + f"nc minifi-primary-{test_context.scenario_id} 10254; " + "sleep 1; done'" + ) + super().__init__("alpine:3.17.3", f"tcp-client-{test_context.scenario_id}", test_context.network, cmd) + + def deploy(self): + super().deploy() + finished_str = "TCP client container started" + return wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=5, + bail_condition=lambda: self.exited, + context=None + )
