This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1cc6d35dd229d851c8289caa18de4321ff44a315 Author: Gabor Gyimesi <[email protected]> AuthorDate: Fri Apr 25 16:57:56 2025 +0200 MINIFICPP-2539 Implement python NiFi API state manager Closes #1944 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/cluster/ContainerStore.py | 3 + .../test/integration/cluster/DockerTestCluster.py | 3 + docker/test/integration/cluster/ImageStore.py | 57 ++++++++++------ .../cluster/containers/MinifiContainer.py | 5 +- .../features/MiNiFi_integration_test_driver.py | 3 + docker/test/integration/features/python.feature | 32 ++++++--- docker/test/integration/features/steps/steps.py | 7 +- .../minifi/processors/TestStateManager.py | 25 +++++++ .../resources/python/TestStateManager.py | 48 +++++++++++++ extensions/python/PYTHON.md | 1 + .../pythonprocessors/nifiapi/componentstate.py | 79 ++++++++++++++++++++++ .../python/pythonprocessors/nifiapi/properties.py | 5 +- extensions/python/types/PyStateManager.cpp | 51 ++++++++++++++ extensions/python/types/PyStateManager.h | 2 + 14 files changed, 286 insertions(+), 35 deletions(-) diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index c49314a7a..4939ad098 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -382,6 +382,9 @@ class ContainerStore: def remove_python_requirements_txt_in_minifi(self): self.minifi_options.remove_python_requirements_txt = True + def use_nifi_python_processors_without_dependencies_in_minifi(self): + self.minifi_options.use_nifi_python_processors_without_dependencies = True + def set_yaml_in_minifi(self): self.minifi_options.config_format = "yaml" diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 7e7c6c549..7d7e7cb93 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -129,6 +129,9 @@ class DockerTestCluster: def remove_python_requirements_txt_in_minifi(self): self.container_store.remove_python_requirements_txt_in_minifi() + def use_nifi_python_processors_without_dependencies_in_minifi(self): + self.container_store.use_nifi_python_processors_without_dependencies_in_minifi() + def set_yaml_in_minifi(self): self.container_store.set_yaml_in_minifi() diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index d10be12e4..6f3218452 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -23,7 +23,7 @@ from textwrap import dedent import os -class PythonOptions: +class PythonWithDependenciesOptions: REQUIREMENTS_FILE = 0 SYSTEM_INSTALLED_PACKAGES = 1 INLINE_DEFINED_PACKAGES = 2 @@ -50,11 +50,13 @@ class ImageStore: elif container_engine == "minifi-cpp-with-example-python-processors": image = self.__build_minifi_cpp_image_with_example_minifi_python_processors() elif container_engine == "minifi-cpp-nifi-python": - image = self.__build_minifi_cpp_image_with_nifi_python_processors(PythonOptions.REQUIREMENTS_FILE) + image = self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.REQUIREMENTS_FILE) elif container_engine == "minifi-cpp-nifi-python-system-python-packages": - image = self.__build_minifi_cpp_image_with_nifi_python_processors(PythonOptions.SYSTEM_INSTALLED_PACKAGES) + image = self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.SYSTEM_INSTALLED_PACKAGES) elif container_engine == "minifi-cpp-nifi-with-inline-python-dependencies": - image = self.__build_minifi_cpp_image_with_nifi_python_processors(PythonOptions.INLINE_DEFINED_PACKAGES) + image = self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.INLINE_DEFINED_PACKAGES) + elif container_engine == "minifi-cpp-nifi-with-python-without-dependencies": + image = self.__build_minifi_cpp_image_with_nifi_python_processors() elif container_engine == "http-proxy": image = self.__build_http_proxy_image() elif container_engine == "postgresql-server": @@ -129,7 +131,7 @@ class ImageStore: return self.__build_image(dockerfile) - def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option): + def __build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(self, python_option): parse_document_url = "https://raw.githubusercontent.com/apache/nifi-python-extensions/refs/heads/main/src/extensions/chunking/ParseDocument.py" chunk_document_url = "https://raw.githubusercontent.com/apache/nifi-python-extensions/refs/heads/main/src/extensions/chunking/ChunkDocument.py" pip3_install_command = "" @@ -141,14 +143,14 @@ class ImageStore: # d: Delete line parse_document_sed_cmd = 'sed -i "/class ProcessorDetails:/,/^$/{/^\\s*dependencies\\s*=/,/\\]\\s*$/d}" /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ParseDocument.py && \\' chunk_document_sed_cmd = 'sed -i "/class ProcessorDetails:/,/^$/{/^\\s*dependencies\\s*=/,/\\]\\s*$/d}" /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ChunkDocument.py && \\' - if python_option == PythonOptions.SYSTEM_INSTALLED_PACKAGES: + if python_option == PythonWithDependenciesOptions.SYSTEM_INSTALLED_PACKAGES: if not MinifiContainer.MINIFI_TAG_PREFIX or "bookworm" in MinifiContainer.MINIFI_TAG_PREFIX or "noble" in MinifiContainer.MINIFI_TAG_PREFIX: additional_cmd = "RUN pip3 install --break-system-packages 'langchain<=0.17.0'" else: additional_cmd = "RUN pip3 install 'langchain<=0.17.0'" - elif python_option == PythonOptions.REQUIREMENTS_FILE: + elif python_option == PythonWithDependenciesOptions.REQUIREMENTS_FILE: requirements_install_command = "echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\" - elif python_option == PythonOptions.INLINE_DEFINED_PACKAGES: + elif python_option == PythonWithDependenciesOptions.INLINE_DEFINED_PACKAGES: parse_document_sed_cmd = parse_document_sed_cmd[:-2] + ' sed -i "s/langchain==[0-9.]\\+/langchain<=0.17.0/" /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ParseDocument.py && \\' chunk_document_sed_cmd = 'sed -i "s/\\[\\\'langchain\\\'\\]/\\[\\\'langchain<=0.17.0\\\'\\]/" /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ChunkDocument.py && \\' if not MinifiContainer.MINIFI_TAG_PREFIX: @@ -159,18 +161,6 @@ class ImageStore: {pip3_install_command} {additional_cmd} USER minificpp - COPY RotatingForwarder.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py - COPY SpecialPropertyTypeChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SpecialPropertyTypeChecker.py - COPY ProcessContextInterfaceChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py - COPY CreateFlowFile.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py - COPY FailureWithAttributes.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py - COPY subtractutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py - COPY RelativeImporterProcessor.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py - COPY multiplierutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py - COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py - COPY FailureWithContent.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py - COPY TransferToOriginal.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py - COPY SetRecordField.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SetRecordField.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\ @@ -189,6 +179,32 @@ class ImageStore: requirements_install_command=requirements_install_command, parse_document_sed_cmd=parse_document_sed_cmd, chunk_document_sed_cmd=chunk_document_sed_cmd)) + return self.__build_image(dockerfile) + + def __build_minifi_cpp_image_with_nifi_python_processors(self): + pip3_install_command = "" + if not MinifiContainer.MINIFI_TAG_PREFIX: + pip3_install_command = "RUN apk --update --no-cache add py3-pip" + dockerfile = dedent("""\ + FROM {base_image} + USER root + {pip3_install_command} + USER minificpp + COPY RotatingForwarder.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py + COPY SpecialPropertyTypeChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SpecialPropertyTypeChecker.py + COPY ProcessContextInterfaceChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py + COPY CreateFlowFile.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py + COPY FailureWithAttributes.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py + COPY subtractutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py + COPY RelativeImporterProcessor.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py + COPY multiplierutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py + COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py + COPY FailureWithContent.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py + COPY TransferToOriginal.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py + COPY SetRecordField.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SetRecordField.py + COPY TestStateManager.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TestStateManager.py + RUN python3 -m venv /opt/minifi/minifi-current/venv + """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, pip3_install_command=pip3_install_command)) def build_full_python_resource_path(resource): return os.path.join(self.test_dir, "resources", "python", resource) @@ -206,6 +222,7 @@ class ImageStore: build_full_python_resource_path("FailureWithContent.py"), build_full_python_resource_path("TransferToOriginal.py"), build_full_python_resource_path("SetRecordField.py"), + build_full_python_resource_path("TestStateManager.py"), ]) def __build_http_proxy_image(self): diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index 10bc2e84f..381388633 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -37,6 +37,7 @@ class MinifiOptions: self.use_nifi_python_processors_with_virtualenv = False self.use_nifi_python_processors_with_virtualenv_packages_installed = False self.remove_python_requirements_txt = False + self.use_nifi_python_processors_without_dependencies = False self.config_format = "json" self.use_flow_config_from_url = False self.set_ssl_context_properties = False @@ -154,7 +155,7 @@ class MinifiContainer(FlowContainer): f.write("controller.socket.port=9998\n") f.write("controller.socket.local.any.interface=false\n") - if self.options.use_nifi_python_processors_with_virtualenv or self.options.remove_python_requirements_txt: + if self.options.use_nifi_python_processors_with_virtualenv or self.options.remove_python_requirements_txt or self.options.use_nifi_python_processors_without_dependencies: f.write("nifi.python.virtualenv.directory=/opt/minifi/minifi-current/venv\n") elif self.options.use_nifi_python_processors_with_virtualenv_packages_installed: f.write("nifi.python.virtualenv.directory=/opt/minifi/minifi-current/venv-with-langchain\n") @@ -193,6 +194,8 @@ class MinifiContainer(FlowContainer): image = self.image_store.get_image('minifi-cpp-nifi-python') elif self.options.remove_python_requirements_txt: image = self.image_store.get_image('minifi-cpp-nifi-with-inline-python-dependencies') + elif self.options.use_nifi_python_processors_without_dependencies: + image = self.image_store.get_image('minifi-cpp-nifi-with-python-without-dependencies') else: image = 'apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index fbc78112d..bb3a4090a 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -392,6 +392,9 @@ class MiNiFi_integration_test: def remove_python_requirements_txt_in_minifi(self): self.cluster.remove_python_requirements_txt_in_minifi() + def use_nifi_python_processors_without_dependencies_in_minifi(self): + self.cluster.use_nifi_python_processors_without_dependencies_in_minifi() + def set_yaml_in_minifi(self): self.cluster.set_yaml_in_minifi() diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 5c63a4e31..c9791f31a 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -76,7 +76,7 @@ Feature: MiNiFi can use python processors in its flows And the "Chunk Overlap" property of the ChunkDocument processor is set to "3" And a PutFile processor with the "Directory" property set to "/tmp/output" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And python is installed on the MiNiFi agent <python_install_mode> + And python with langchain is installed on the MiNiFi agent <python_install_mode> And the "success" relationship of the GetFile processor is connected to the ParseDocument And the "success" relationship of the ParseDocument processor is connected to the ChunkDocument @@ -101,7 +101,7 @@ Feature: MiNiFi can use python processors in its flows And a LogAttribute processor with the "FlowFiles To Log" property set to "0" And the "space" relationship of the CreateFlowFile processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent When the MiNiFi instance starts up Then a flowfile with the content "Hello World!" is placed in the monitored directory in less than 10 seconds And the Minifi logs contain the following message: "key:filename value:" in less than 60 seconds @@ -116,7 +116,7 @@ Feature: MiNiFi can use python processors in its flows And a file with filename "test_file4.log" and content "test_data_four" is present in "/tmp/input" And a RotatingForwarder processor And a PutFile processor with the "Directory" property set to "/tmp/output" - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GetFile processor is connected to the RotatingForwarder And the "first" relationship of the RotatingForwarder processor is connected to the PutFile @@ -133,7 +133,7 @@ Feature: MiNiFi can use python processors in its flows Given a GenerateFlowFile processor with the "File Size" property set to "0B" And a SpecialPropertyTypeChecker processor And a PutFile processor with the "Directory" property set to "/tmp/output" - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And a SSL context service is set up for the following processor: "SpecialPropertyTypeChecker" And the "success" relationship of the GenerateFlowFile processor is connected to the SpecialPropertyTypeChecker @@ -148,7 +148,7 @@ Feature: MiNiFi can use python processors in its flows Given a GenerateFlowFile processor with the "File Size" property set to "0B" And a ProcessContextInterfaceChecker processor And a PutFile processor with the "Directory" property set to "/tmp/output" - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GenerateFlowFile processor is connected to the ProcessContextInterfaceChecker And the "myrelationship" relationship of the ProcessContextInterfaceChecker processor is connected to the PutFile @@ -164,7 +164,7 @@ Feature: MiNiFi can use python processors in its flows And the "error.message" property of the UpdateAttribute processor is set to "Old error" And a FailureWithAttributes processor And a LogAttribute processor - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute And the "success" relationship of the UpdateAttribute processor is connected to the FailureWithAttributes @@ -180,7 +180,7 @@ Feature: MiNiFi can use python processors in its flows Given a GenerateFlowFile processor with the "File Size" property set to "0B" And a RelativeImporterProcessor processor And a PutFile processor with the "Directory" property set to "/tmp/output" - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GenerateFlowFile processor is connected to the RelativeImporterProcessor And the "success" relationship of the RelativeImporterProcessor processor is connected to the PutFile @@ -194,7 +194,7 @@ Feature: MiNiFi can use python processors in its flows Given a CreateNothing processor And a PutFile processor with the "Directory" property set to "/tmp/output" And the "success" relationship of the CreateNothing processor is connected to the PutFile - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent When the MiNiFi instance starts up Then no files are placed in the monitored directory in 10 seconds of running time And the Minifi logs do not contain the following message: "Caught Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 seconds @@ -203,7 +203,7 @@ Feature: MiNiFi can use python processors in its flows Scenario: NiFi native python processor cannot specify content of failure result Given a GenerateFlowFile processor with the "File Size" property set to "0B" And a FailureWithContent processor - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GenerateFlowFile processor is connected to the FailureWithContent @@ -215,7 +215,7 @@ Feature: MiNiFi can use python processors in its flows Scenario: NiFi native python processor cannot transfer to original relationship Given a GenerateFlowFile processor with the "File Size" property set to "0B" And a TransferToOriginal processor - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GenerateFlowFile processor is connected to the TransferToOriginal @@ -234,7 +234,7 @@ Feature: MiNiFi can use python processors in its flows And a JsonRecordSetWriter controller service is set up with "Array" output grouping And a LogAttribute processor with the "FlowFiles To Log" property set to "0" And the "Log Payload" property of the LogAttribute processor is set to "true" - And python is installed on the MiNiFi agent with a pre-created virtualenv + And python virtualenv is installed on the MiNiFi agent And the "success" relationship of the GetFile processor is connected to the SetRecordField And the "success" relationship of the SetRecordField processor is connected to the LogAttribute @@ -246,3 +246,13 @@ Feature: MiNiFi can use python processors in its flows And the Minifi logs contain the following message: '[{"name":"Zoe"}]' in less than 5 seconds And the Minifi logs contain the following message: '[{"group":"group1","name":"Steve"}]' in less than 5 seconds And the Minifi logs contain the following message: '[{}]' in less than 5 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: MiNiFi C++ can use state manager commands in native NiFi python processors + Given a TestStateManager processor + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And the "success" relationship of the TestStateManager processor is connected to the LogAttribute + And python virtualenv is installed on the MiNiFi agent + When the MiNiFi instance starts up + Then the Minifi logs contain the following message: "key:state_key value:1" in less than 60 seconds + And the Minifi logs contain the following message: "key:state_key value:2" in less than 60 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index e9b45c2ab..76e91d59e 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1336,7 +1336,7 @@ def step_impl(context): # Python -@given("python is installed on the MiNiFi agent {install_mode}") +@given("python with langchain is installed on the MiNiFi agent {install_mode}") def step_impl(context, install_mode): if install_mode == "with required python packages": context.test.use_nifi_python_processors_with_system_python_packages_installed_in_minifi() @@ -1350,6 +1350,11 @@ def step_impl(context, install_mode): raise Exception("Unknown python install mode.") +@given("python virtualenv is installed on the MiNiFi agent") +def step_impl(context): + context.test.use_nifi_python_processors_without_dependencies_in_minifi() + + @given("the example MiNiFi python processors are present") def step_impl(context): context.test.enable_example_minifi_python_processors() diff --git a/docker/test/integration/minifi/processors/TestStateManager.py b/docker/test/integration/minifi/processors/TestStateManager.py new file mode 100644 index 000000000..629ea51f3 --- /dev/null +++ b/docker/test/integration/minifi/processors/TestStateManager.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class TestStateManager(Processor): + def __init__(self, context, schedule={'scheduling period': '1 sec'}): + super(TestStateManager, self).__init__( + context=context, + clazz='TestStateManager', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + schedule=schedule, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/TestStateManager.py b/docker/test/integration/resources/python/TestStateManager.py new file mode 100644 index 000000000..b5d657176 --- /dev/null +++ b/docker/test/integration/resources/python/TestStateManager.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.componentstate import Scope +from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult + + +class TestStateManager(FlowFileSource): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileSource'] + + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + description = '''A Python source processor that uses StateManager.''' + tags = ['text', 'test', 'python', 'source'] + + def __init__(self, **kwargs): + pass + + def getPropertyDescriptors(self): + return [] + + def create(self, context): + state_manager = context.getStateManager() + state = state_manager.getState(Scope.CLUSTER) + old_value = state.get("state_key") + if not old_value: + new_state = {'state_key': '1'} + state_manager.setState(new_state, Scope.CLUSTER) + elif old_value == '1': + new_state = {'state_key': '2'} + state_manager.replace(state, new_state, Scope.CLUSTER) + else: + state_manager.clear(Scope.CLUSTER) + + return FlowFileSourceResult(relationship='success', attributes=state.toMap(), contents='Output FlowFile Contents') diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md index d99ac654b..3dfbc5d7f 100644 --- a/extensions/python/PYTHON.md +++ b/extensions/python/PYTHON.md @@ -172,6 +172,7 @@ Due to some differences between the NiFi and MiNiFi C++ processors and implement - The interface of the `ProcessContext` class is a bit more limited in MiNiFi C++ compared to NiFi. The available methods in `ProcessContext` are `getProperty`, `getStateManager`, `getName` and `getProperties`. - Success relationship is always present in all Python processors even if custom relationships are defined in the Python processor with the `getRelationships` method. - MiNiFi C++ uses a single embedded Python interpreter for all Python processors, so the Python processors share the same Python interpreter. This means that the Python processors cannot have different Python versions or use different Python packages. The Python packages are installed on the system or in a single virtualenv that is shared by all Python processors. +- State manager API is available with the same interface as in NiFi, but MiNiFi C++ uses transactional state management, due to this when a state is changed in a processor trigger the state cannot be read due to the dirty read protection. The state can be read in the next trigger after the state is committed at the end of a session. Also MiNiFi C++ does not use clustering, so the scope parameter for the state operations is ignored as it is always local. ## Use Python processors from virtualenv diff --git a/extensions/python/pythonprocessors/nifiapi/componentstate.py b/extensions/python/pythonprocessors/nifiapi/componentstate.py new file mode 100644 index 000000000..f6cf7e6b7 --- /dev/null +++ b/extensions/python/pythonprocessors/nifiapi/componentstate.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Dict +from minifi_native import StateManager as CppFlowFile + + +class Scope(Enum): + CLUSTER = 1 + LOCAL = 2 + + +class StateMap: + def __init__(self, state_map: Dict[str, str]): + self.state_map = state_map if state_map is not None else {} + + def getStateVersion(self) -> int: + return 1 + + def get(self, key) -> str: + if key not in self.state_map: + return None + + return self.state_map[key] + + def toMap(self) -> Dict[str, str]: + return self.state_map + + +class StateManager: + """ + Python class wrapping the StateManager CPP implementation. + """ + + def __init__(self, cpp_state_manager: CppFlowFile): + self.cpp_state_manager = cpp_state_manager + + def setState(self, state: Dict[str, str], scope: Scope) -> bool: + try: + return self.cpp_state_manager.set(state) + except Exception as exception: + raise StateException("Set state failed") from exception + + def getState(self, scope: Scope) -> StateMap: + try: + return StateMap(self.cpp_state_manager.get()) + except Exception as exception: + raise StateException("Get state failed") from exception + + def replace(self, old_state: StateMap, new_values: Dict[str, str], scope: Scope) -> bool: + try: + return self.cpp_state_manager.replace(old_state.toMap(), new_values) + except Exception as exception: + raise StateException("Replace state failed") from exception + + def clear(self, scope: Scope): + try: + self.cpp_state_manager.clear() + except Exception as exception: + raise StateException("Clear state failed") from exception + + +class StateException(Exception): + """ + Exception class for any exception produced by the operations in StateManager. + """ diff --git a/extensions/python/pythonprocessors/nifiapi/properties.py b/extensions/python/pythonprocessors/nifiapi/properties.py index 996a0bece..e4f71a9b4 100644 --- a/extensions/python/pythonprocessors/nifiapi/properties.py +++ b/extensions/python/pythonprocessors/nifiapi/properties.py @@ -15,9 +15,10 @@ from enum import Enum from typing import List, Dict -from minifi_native import ProcessSession, StateManager, timePeriodStringToMilliseconds, dataSizeStringToBytes +from minifi_native import ProcessSession, timePeriodStringToMilliseconds, dataSizeStringToBytes from minifi_native import FlowFile as CppFlowFile from minifi_native import ProcessContext as CppProcessContext +from .componentstate import StateManager # This is a mock for NiFi's StandardValidators class methods, that return the property type equivalent in MiNiFi C++ if exists @@ -294,7 +295,7 @@ class ProcessContext: return PythonPropertyValue(self.cpp_context, property_name, property_value, expression_language_support, controller_service_definition) def getStateManager(self) -> StateManager: - return self.cpp_context.getStateManager() + return StateManager(self.cpp_context.getStateManager()) def getName(self) -> str: return self.cpp_context.getName() diff --git a/extensions/python/types/PyStateManager.cpp b/extensions/python/types/PyStateManager.cpp index 957e8aa51..0b0993a68 100644 --- a/extensions/python/types/PyStateManager.cpp +++ b/extensions/python/types/PyStateManager.cpp @@ -23,6 +23,8 @@ namespace org::apache::nifi::minifi::extensions::python { static PyMethodDef PyStateManager_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {"get", (PyCFunction) PyStateManager::get, METH_VARARGS, nullptr}, {"set", (PyCFunction) PyStateManager::set, METH_VARARGS, nullptr}, + {"clear", (PyCFunction) PyStateManager::clear, METH_VARARGS, nullptr}, + {"replace", (PyCFunction) PyStateManager::replace, METH_VARARGS, nullptr}, {} /* Sentinel */ }; @@ -92,6 +94,55 @@ PyObject* PyStateManager::get(PyStateManager* self, PyObject*) { } } +PyObject* PyStateManager::clear(PyStateManager* self, PyObject* /*args*/) { + if (!self->state_manager_) { + PyErr_SetString(PyExc_AttributeError, "tried reading state manager outside 'on_trigger'"); + return nullptr; + } + + self->state_manager_->clear(); + Py_RETURN_NONE; +} + +PyObject* PyStateManager::replace(PyStateManager* self, PyObject* args) { + if (!self->state_manager_) { + PyErr_SetString(PyExc_AttributeError, "tried reading state manager outside 'on_trigger'"); + return nullptr; + } + core::StateManager::State old_cpp_state; + + auto old_state = BorrowedDict::fromTuple(args, 0); + + auto old_state_keys = OwnedList(PyDict_Keys(old_state.get())); + for (size_t i = 0; i < old_state_keys.length(); ++i) { + BorrowedStr key{old_state_keys[i]}; + if (auto value = old_state[key.toUtf8String()]) { + BorrowedStr value_str{*value}; + old_cpp_state[key.toUtf8String()] = value_str.toUtf8String(); + } + } + + auto current_cpp_state = self->state_manager_->get(); + if ((!current_cpp_state && old_state_keys.length() > 0) || (current_cpp_state && old_cpp_state != *current_cpp_state)) { + return object::returnReference(false); + } + + core::StateManager::State new_cpp_state; + + auto new_python_state = BorrowedDict::fromTuple(args, 1); + + auto new_python_state_keys = OwnedList(PyDict_Keys(new_python_state.get())); + for (size_t i = 0; i < new_python_state_keys.length(); ++i) { + BorrowedStr key{new_python_state_keys[i]}; + if (auto value = new_python_state[key.toUtf8String()]) { + BorrowedStr value_str{*value}; + new_cpp_state[key.toUtf8String()] = value_str.toUtf8String(); + } + } + + return object::returnReference(self->state_manager_->set(new_cpp_state)); +} + PyTypeObject* PyStateManager::typeObject() { static OwnedObject PyStateManagerType{PyType_FromSpec(&PyStateManagerTypeSpec)}; return reinterpret_cast<PyTypeObject*>(PyStateManagerType.get()); diff --git a/extensions/python/types/PyStateManager.h b/extensions/python/types/PyStateManager.h index 7e0c71873..b762a23da 100644 --- a/extensions/python/types/PyStateManager.h +++ b/extensions/python/types/PyStateManager.h @@ -35,6 +35,8 @@ struct PyStateManager { static PyObject* set(PyStateManager* self, PyObject* args); static PyObject* get(PyStateManager* self, PyObject* args); + static PyObject* clear(PyStateManager* self, PyObject* args); + static PyObject* replace(PyStateManager* self, PyObject* args); static PyTypeObject* typeObject(); };
