This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit de792c6fbed8c5b21483a0bdf8d6d3bb3762a863 Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Jul 16 16:00:22 2024 +0000 MINIFICPP-2369 Test parameters with expression language Closes #1809 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/cluster/ContainerStore.py | 3 ++ .../test/integration/cluster/DockerTestCluster.py | 3 ++ .../cluster/containers/FlowContainer.py | 17 +++++++ .../cluster/containers/MinifiContainer.py | 2 +- .../features/MiNiFi_integration_test_driver.py | 3 ++ .../features/core_functionality.feature | 31 +++++++++++++ docker/test/integration/features/steps/steps.py | 17 +++++++ .../Minifi_flow_json_serializer.py | 21 ++++++++- .../Minifi_flow_yaml_serializer.py | 53 +++++++++++++--------- 9 files changed, 127 insertions(+), 23 deletions(-) diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index bd775ecea..2f051bf57 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -364,6 +364,9 @@ class ContainerStore: def set_yaml_in_minifi(self): self.minifi_options.config_format = "yaml" + def set_json_in_minifi(self): + self.minifi_options.config_format = "json" + def set_controller_socket_properties_in_minifi(self): self.minifi_options.enable_controller_socket = True diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index c54fb8ab2..581f439cf 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -122,6 +122,9 @@ class DockerTestCluster: def set_yaml_in_minifi(self): self.container_store.set_yaml_in_minifi() + def set_json_in_minifi(self): + self.container_store.set_json_in_minifi() + def set_controller_socket_properties_in_minifi(self): self.container_store.set_controller_socket_properties_in_minifi() diff --git a/docker/test/integration/cluster/containers/FlowContainer.py b/docker/test/integration/cluster/containers/FlowContainer.py index ff040f4bd..5b5dabe71 100644 --- a/docker/test/integration/cluster/containers/FlowContainer.py +++ b/docker/test/integration/cluster/containers/FlowContainer.py @@ -17,6 +17,12 @@ from .Container import Container +class Parameter: + def __init__(self, name, value): + self.name = name + self.value = value + + class FlowContainer(Container): def __init__(self, feature_context, config_dir, name, engine, vols, network, image_store, command): super().__init__(feature_context=feature_context, @@ -29,6 +35,8 @@ class FlowContainer(Container): self.start_nodes = [] self.config_dir = config_dir self.controllers = [] + self.parameter_contexts = dict() + self.parameter_context_name = None def get_start_nodes(self): return self.start_nodes @@ -38,3 +46,12 @@ class FlowContainer(Container): def add_controller(self, controller): self.controllers.append(controller) + + def add_parameter_to_flow_config(self, parameter_context_name, parameter_name, parameter_value): + if parameter_context_name in self.parameter_contexts: + self.parameter_contexts[parameter_context_name].append(Parameter(parameter_name, parameter_value)) + else: + self.parameter_contexts[parameter_context_name] = [Parameter(parameter_name, parameter_value)] + + def set_parameter_context_name(self, parameter_context_name): + self.parameter_context_name = parameter_context_name diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index db1dbd191..bbe56a879 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -84,7 +84,7 @@ class MinifiContainer(FlowContainer): serializer = Minifi_flow_json_serializer() else: assert False, "Invalid flow configuration format: {}".format(self.options.config_format) - test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers) + test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers, self.parameter_context_name, self.parameter_contexts) logging.info('Using generated flow config yml:\n%s', test_flow_yaml) absolute_flow_config_path = os.path.join(self.container_specific_config_dir, "config.yml") with open(absolute_flow_config_path, 'wb') as config_file: diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index a80a50ebd..34acc0096 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -432,6 +432,9 @@ class MiNiFi_integration_test: def set_yaml_in_minifi(self): self.cluster.set_yaml_in_minifi() + def set_json_in_minifi(self): + self.cluster.set_json_in_minifi() + def set_controller_socket_properties_in_minifi(self): self.cluster.set_controller_socket_properties_in_minifi() diff --git a/docker/test/integration/features/core_functionality.feature b/docker/test/integration/features/core_functionality.feature index 8d3352bd4..ef4bc5df2 100644 --- a/docker/test/integration/features/core_functionality.feature +++ b/docker/test/integration/features/core_functionality.feature @@ -71,6 +71,7 @@ Feature: Core flow functionalities Then the peak memory usage of the agent is more than 130 MB in less than 20 seconds And the memory usage of the agent decreases to 70% peak usage in less than 20 seconds + @CORE Scenario: Metrics can be logged Given a GenerateFlowFile processor And log metrics publisher is enabled in MiNiFi @@ -84,3 +85,33 @@ Feature: Core flow functionalities And the Minifi logs contain the following message: ' "size": "0"' in less than 2 seconds And the Minifi logs contain the following message: ' },' in less than 2 seconds And the Minifi logs contain the following message: ' "provenance": {' in less than 2 seconds + + @CORE + Scenario Outline: DefragmentText correctly merges split messages from TailFile multiple file tail-mode + Given a MiNiFi CPP server with <config_format> config + And parameter context name is set to 'my-context' + And a non-sensitive parameter in the flow config called 'FILENAME' with the value '${filename}' in the parameter context 'my-context' + And a non-sensitive parameter in the flow config called 'FILENAME_IN_EXPRESSION' with the value 'filename' in the parameter context 'my-context' + And a non-sensitive parameter in the flow config called 'FILE_INPUT_PATH' with the value '/tmp/input' in the parameter context 'my-context' + And a non-sensitive parameter in the flow config called 'FILE_OUTPUT_UPPER_PATH_ATTR' with the value 'upper_out_path_attr' in the parameter context 'my-context' + And a GetFile processor with the "Input Directory" property set to "#{FILE_INPUT_PATH}" + And a file with filename "test_file_name" and content "test content" is present in "/tmp/input" + And a UpdateAttribute processor with the "expr-lang-filename" property set to "#{FILENAME}" + And the "is-upper-correct" property of the UpdateAttribute processor is set to "${#{FILENAME_IN_EXPRESSION}:toUpper():equals('TEST_FILE_NAME')}" + And the "upper_out_path_attr" property of the UpdateAttribute processor is set to "/TMP/OUTPUT" + And a PutFile processor with the "Directory" property set to "${#{FILE_OUTPUT_UPPER_PATH_ATTR}:toLower()}" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + + And the "success" relationship of the GetFile processor is connected to the UpdateAttribute + And the "success" relationship of the UpdateAttribute processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When all instances start up + + Then the Minifi logs contain the following message: "key:expr-lang-filename value:test_file_name" in less than 60 seconds + And the Minifi logs contain the following message: "key:is-upper-correct value:true" in less than 60 seconds + + Examples: + | config_format | + | yaml | + | json | diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 47d27430b..b26cd1c61 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1106,6 +1106,11 @@ def step_impl(context): context.test.set_yaml_in_minifi() +@given(u'a MiNiFi CPP server with json config') +def step_impl(context): + context.test.set_json_in_minifi() + + # MiNiFi controller @given(u'controller socket properties are set up') def step_impl(context): @@ -1288,3 +1293,15 @@ def step_impl(context, install_mode): @given("the example MiNiFi python processors are present") def step_impl(context): context.test.enable_example_minifi_python_processors() + + +@given("a non-sensitive parameter in the flow config called '{parameter_name}' with the value '{parameter_value}' in the parameter context '{parameter_context_name}'") +def step_impl(context, parameter_context_name, parameter_name, parameter_value): + container = context.test.acquire_container(context=context, name='minifi-cpp-flow', engine='minifi-cpp') + container.add_parameter_to_flow_config(parameter_context_name, parameter_name, parameter_value) + + +@given("parameter context name is set to '{parameter_context_name}'") +def step_impl(context, parameter_context_name): + container = context.test.acquire_container(context=context, name='minifi-cpp-flow', engine='minifi-cpp') + container.set_parameter_context_name(parameter_context_name) diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py index 422a2c755..e1769f632 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -23,8 +23,9 @@ from ..core.Funnel import Funnel class Minifi_flow_json_serializer: - def serialize(self, start_nodes, controllers): + def serialize(self, start_nodes, controllers, parameter_context_name: str, parameter_contexts): res = { + 'parameterContexts': [], 'rootGroup': { 'name': 'MiNiFi Flow', 'processors': [], @@ -36,6 +37,24 @@ class Minifi_flow_json_serializer: } visited = [] + if parameter_context_name: + res['rootGroup']['parameterContextName'] = parameter_context_name + + if parameter_contexts: + for context_name in parameter_contexts: + res['parameterContexts'].append({ + 'identifier': str(uuid.uuid4()), + 'name': context_name, + 'parameters': [] + }) + for parameter in parameter_contexts[context_name]: + res['parameterContexts'][-1]['parameters'].append({ + 'name': parameter.name, + 'description': '', + 'sensitive': False, + 'value': parameter.value + }) + for node in start_nodes: self.serialize_node(node, res['rootGroup'], visited) diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py index a82e11fbc..ff8b37de8 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -23,9 +23,37 @@ from ..core.Funnel import Funnel class Minifi_flow_yaml_serializer: - def serialize(self, start_nodes, controllers): - res = None - visited = None + def serialize(self, start_nodes, controllers, parameter_context_name: str, parameter_contexts): + res = { + 'Flow Controller': { + 'name': 'MiNiFi Flow' + }, + 'Processors': [], + 'Funnels': [], + 'Connections': [], + 'Remote Processing Groups': [], + 'Controller Services': [] + } + visited = [] + + if parameter_context_name: + res['Parameter Context Name'] = parameter_context_name + + if parameter_contexts: + res['Parameter Contexts'] = [] + for context_name in parameter_contexts: + res['Parameter Contexts'].append({ + 'id': str(uuid.uuid4()), + 'name': context_name, + 'Parameters': [] + }) + for parameter in parameter_contexts[context_name]: + res['Parameter Contexts'][-1]['Parameters'].append({ + 'name': parameter.name, + 'description': '', + 'sensitive': False, + 'value': parameter.value + }) for node in start_nodes: res, visited = self.serialize_node(node, res, visited) @@ -35,24 +63,7 @@ class Minifi_flow_yaml_serializer: return yaml.dump(res, default_flow_style=False) - def serialize_node(self, connectable, root=None, visited=None): - if visited is None: - visited = [] - - if root is None: - res = { - 'Flow Controller': { - 'name': 'MiNiFi Flow' - }, - 'Processors': [], - 'Funnels': [], - 'Connections': [], - 'Remote Processing Groups': [], - 'Controller Services': [] - } - else: - res = root - + def serialize_node(self, connectable, res=None, visited=None): visited.append(connectable) if hasattr(connectable, 'name'):
