This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1cc6d35dd229d851c8289caa18de4321ff44a315
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Fri Apr 25 16:57:56 2025 +0200

    MINIFICPP-2539 Implement python NiFi API state manager
    
    Closes #1944
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 docker/test/integration/cluster/ContainerStore.py  |  3 +
 .../test/integration/cluster/DockerTestCluster.py  |  3 +
 docker/test/integration/cluster/ImageStore.py      | 57 ++++++++++------
 .../cluster/containers/MinifiContainer.py          |  5 +-
 .../features/MiNiFi_integration_test_driver.py     |  3 +
 docker/test/integration/features/python.feature    | 32 ++++++---
 docker/test/integration/features/steps/steps.py    |  7 +-
 .../minifi/processors/TestStateManager.py          | 25 +++++++
 .../resources/python/TestStateManager.py           | 48 +++++++++++++
 extensions/python/PYTHON.md                        |  1 +
 .../pythonprocessors/nifiapi/componentstate.py     | 79 ++++++++++++++++++++++
 .../python/pythonprocessors/nifiapi/properties.py  |  5 +-
 extensions/python/types/PyStateManager.cpp         | 51 ++++++++++++++
 extensions/python/types/PyStateManager.h           |  2 +
 14 files changed, 286 insertions(+), 35 deletions(-)

diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index c49314a7a..4939ad098 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -382,6 +382,9 @@ class ContainerStore:
     def remove_python_requirements_txt_in_minifi(self):
         self.minifi_options.remove_python_requirements_txt = True
 
+    def use_nifi_python_processors_without_dependencies_in_minifi(self):
+        self.minifi_options.use_nifi_python_processors_without_dependencies = 
True
+
     def set_yaml_in_minifi(self):
         self.minifi_options.config_format = "yaml"
 
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index 7e7c6c549..7d7e7cb93 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -129,6 +129,9 @@ class DockerTestCluster:
     def remove_python_requirements_txt_in_minifi(self):
         self.container_store.remove_python_requirements_txt_in_minifi()
 
+    def use_nifi_python_processors_without_dependencies_in_minifi(self):
+        
self.container_store.use_nifi_python_processors_without_dependencies_in_minifi()
+
     def set_yaml_in_minifi(self):
         self.container_store.set_yaml_in_minifi()
 
diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index d10be12e4..6f3218452 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -23,7 +23,7 @@ from textwrap import dedent
 import os
 
 
-class PythonOptions:
+class PythonWithDependenciesOptions:
     REQUIREMENTS_FILE = 0
     SYSTEM_INSTALLED_PACKAGES = 1
     INLINE_DEFINED_PACKAGES = 2
@@ -50,11 +50,13 @@ class ImageStore:
         elif container_engine == "minifi-cpp-with-example-python-processors":
             image = 
self.__build_minifi_cpp_image_with_example_minifi_python_processors()
         elif container_engine == "minifi-cpp-nifi-python":
-            image = 
self.__build_minifi_cpp_image_with_nifi_python_processors(PythonOptions.REQUIREMENTS_FILE)
+            image = 
self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.REQUIREMENTS_FILE)
         elif container_engine == 
"minifi-cpp-nifi-python-system-python-packages":
-            image = 
self.__build_minifi_cpp_image_with_nifi_python_processors(PythonOptions.SYSTEM_INSTALLED_PACKAGES)
+            image = 
self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.SYSTEM_INSTALLED_PACKAGES)
         elif container_engine == 
"minifi-cpp-nifi-with-inline-python-dependencies":
-            image = 
self.__build_minifi_cpp_image_with_nifi_python_processors(PythonOptions.INLINE_DEFINED_PACKAGES)
+            image = 
self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.INLINE_DEFINED_PACKAGES)
+        elif container_engine == 
"minifi-cpp-nifi-with-python-without-dependencies":
+            image = self.__build_minifi_cpp_image_with_nifi_python_processors()
         elif container_engine == "http-proxy":
             image = self.__build_http_proxy_image()
         elif container_engine == "postgresql-server":
@@ -129,7 +131,7 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
-    def __build_minifi_cpp_image_with_nifi_python_processors(self, 
python_option):
+    def 
__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(self, 
python_option):
         parse_document_url = 
"https://raw.githubusercontent.com/apache/nifi-python-extensions/refs/heads/main/src/extensions/chunking/ParseDocument.py";
         chunk_document_url = 
"https://raw.githubusercontent.com/apache/nifi-python-extensions/refs/heads/main/src/extensions/chunking/ChunkDocument.py";
         pip3_install_command = ""
@@ -141,14 +143,14 @@ class ImageStore:
         # d: Delete line
         parse_document_sed_cmd = 'sed -i "/class 
ProcessorDetails:/,/^$/{/^\\s*dependencies\\s*=/,/\\]\\s*$/d}" 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ParseDocument.py
 && \\'
         chunk_document_sed_cmd = 'sed -i "/class 
ProcessorDetails:/,/^$/{/^\\s*dependencies\\s*=/,/\\]\\s*$/d}" 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ChunkDocument.py
 && \\'
-        if python_option == PythonOptions.SYSTEM_INSTALLED_PACKAGES:
+        if python_option == 
PythonWithDependenciesOptions.SYSTEM_INSTALLED_PACKAGES:
             if not MinifiContainer.MINIFI_TAG_PREFIX or "bookworm" in 
MinifiContainer.MINIFI_TAG_PREFIX or "noble" in 
MinifiContainer.MINIFI_TAG_PREFIX:
                 additional_cmd = "RUN pip3 install --break-system-packages 
'langchain<=0.17.0'"
             else:
                 additional_cmd = "RUN pip3 install 'langchain<=0.17.0'"
-        elif python_option == PythonOptions.REQUIREMENTS_FILE:
+        elif python_option == PythonWithDependenciesOptions.REQUIREMENTS_FILE:
             requirements_install_command = "echo 'langchain<=0.17.0' > 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt
 && \\"
-        elif python_option == PythonOptions.INLINE_DEFINED_PACKAGES:
+        elif python_option == 
PythonWithDependenciesOptions.INLINE_DEFINED_PACKAGES:
             parse_document_sed_cmd = parse_document_sed_cmd[:-2] + ' sed -i 
"s/langchain==[0-9.]\\+/langchain<=0.17.0/" 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ParseDocument.py
 && \\'
             chunk_document_sed_cmd = 'sed -i 
"s/\\[\\\'langchain\\\'\\]/\\[\\\'langchain<=0.17.0\\\'\\]/" 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ChunkDocument.py
 && \\'
         if not MinifiContainer.MINIFI_TAG_PREFIX:
@@ -159,18 +161,6 @@ class ImageStore:
                 {pip3_install_command}
                 {additional_cmd}
                 USER minificpp
-                COPY RotatingForwarder.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py
-                COPY SpecialPropertyTypeChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/SpecialPropertyTypeChecker.py
-                COPY ProcessContextInterfaceChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py
-                COPY CreateFlowFile.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py
-                COPY FailureWithAttributes.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py
-                COPY subtractutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py
-                COPY RelativeImporterProcessor.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py
-                COPY multiplierutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py
-                COPY CreateNothing.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py
-                COPY FailureWithContent.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py
-                COPY TransferToOriginal.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py
-                COPY SetRecordField.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/SetRecordField.py
                 RUN wget {parse_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     wget {chunk_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     echo 'langchain<=0.17.0' > 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt
 && \\
@@ -189,6 +179,32 @@ class ImageStore:
                            
requirements_install_command=requirements_install_command,
                            parse_document_sed_cmd=parse_document_sed_cmd,
                            chunk_document_sed_cmd=chunk_document_sed_cmd))
+        return self.__build_image(dockerfile)
+
+    def __build_minifi_cpp_image_with_nifi_python_processors(self):
+        pip3_install_command = ""
+        if not MinifiContainer.MINIFI_TAG_PREFIX:
+            pip3_install_command = "RUN apk --update --no-cache add py3-pip"
+        dockerfile = dedent("""\
+                FROM {base_image}
+                USER root
+                {pip3_install_command}
+                USER minificpp
+                COPY RotatingForwarder.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py
+                COPY SpecialPropertyTypeChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/SpecialPropertyTypeChecker.py
+                COPY ProcessContextInterfaceChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py
+                COPY CreateFlowFile.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py
+                COPY FailureWithAttributes.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py
+                COPY subtractutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py
+                COPY RelativeImporterProcessor.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py
+                COPY multiplierutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py
+                COPY CreateNothing.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py
+                COPY FailureWithContent.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py
+                COPY TransferToOriginal.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py
+                COPY SetRecordField.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/SetRecordField.py
+                COPY TestStateManager.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/TestStateManager.py
+                RUN python3 -m venv /opt/minifi/minifi-current/venv
+                """.format(base_image='apacheminificpp:' + 
MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, 
pip3_install_command=pip3_install_command))
 
         def build_full_python_resource_path(resource):
             return os.path.join(self.test_dir, "resources", "python", resource)
@@ -206,6 +222,7 @@ class ImageStore:
             build_full_python_resource_path("FailureWithContent.py"),
             build_full_python_resource_path("TransferToOriginal.py"),
             build_full_python_resource_path("SetRecordField.py"),
+            build_full_python_resource_path("TestStateManager.py"),
         ])
 
     def __build_http_proxy_image(self):
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py 
b/docker/test/integration/cluster/containers/MinifiContainer.py
index 10bc2e84f..381388633 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -37,6 +37,7 @@ class MinifiOptions:
         self.use_nifi_python_processors_with_virtualenv = False
         self.use_nifi_python_processors_with_virtualenv_packages_installed = 
False
         self.remove_python_requirements_txt = False
+        self.use_nifi_python_processors_without_dependencies = False
         self.config_format = "json"
         self.use_flow_config_from_url = False
         self.set_ssl_context_properties = False
@@ -154,7 +155,7 @@ class MinifiContainer(FlowContainer):
                 f.write("controller.socket.port=9998\n")
                 f.write("controller.socket.local.any.interface=false\n")
 
-            if self.options.use_nifi_python_processors_with_virtualenv or 
self.options.remove_python_requirements_txt:
+            if self.options.use_nifi_python_processors_with_virtualenv or 
self.options.remove_python_requirements_txt or 
self.options.use_nifi_python_processors_without_dependencies:
                 
f.write("nifi.python.virtualenv.directory=/opt/minifi/minifi-current/venv\n")
             elif 
self.options.use_nifi_python_processors_with_virtualenv_packages_installed:
                 
f.write("nifi.python.virtualenv.directory=/opt/minifi/minifi-current/venv-with-langchain\n")
@@ -193,6 +194,8 @@ class MinifiContainer(FlowContainer):
             image = self.image_store.get_image('minifi-cpp-nifi-python')
         elif self.options.remove_python_requirements_txt:
             image = 
self.image_store.get_image('minifi-cpp-nifi-with-inline-python-dependencies')
+        elif self.options.use_nifi_python_processors_without_dependencies:
+            image = 
self.image_store.get_image('minifi-cpp-nifi-with-python-without-dependencies')
         else:
             image = 'apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + 
MinifiContainer.MINIFI_VERSION
 
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index fbc78112d..bb3a4090a 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -392,6 +392,9 @@ class MiNiFi_integration_test:
     def remove_python_requirements_txt_in_minifi(self):
         self.cluster.remove_python_requirements_txt_in_minifi()
 
+    def use_nifi_python_processors_without_dependencies_in_minifi(self):
+        
self.cluster.use_nifi_python_processors_without_dependencies_in_minifi()
+
     def set_yaml_in_minifi(self):
         self.cluster.set_yaml_in_minifi()
 
diff --git a/docker/test/integration/features/python.feature 
b/docker/test/integration/features/python.feature
index 5c63a4e31..c9791f31a 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -76,7 +76,7 @@ Feature: MiNiFi can use python processors in its flows
     And the "Chunk Overlap" property of the ChunkDocument processor is set to 
"3"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
-    And python is installed on the MiNiFi agent <python_install_mode>
+    And python with langchain is installed on the MiNiFi agent 
<python_install_mode>
 
     And the "success" relationship of the GetFile processor is connected to 
the ParseDocument
     And the "success" relationship of the ParseDocument processor is connected 
to the ChunkDocument
@@ -101,7 +101,7 @@ Feature: MiNiFi can use python processors in its flows
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
     And the "space" relationship of the CreateFlowFile processor is connected 
to the PutFile
     And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
     When the MiNiFi instance starts up
     Then a flowfile with the content "Hello World!" is placed in the monitored 
directory in less than 10 seconds
     And the Minifi logs contain the following message: "key:filename value:" 
in less than 60 seconds
@@ -116,7 +116,7 @@ Feature: MiNiFi can use python processors in its flows
     And a file with filename "test_file4.log" and content "test_data_four" is 
present in "/tmp/input"
     And a RotatingForwarder processor
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GetFile processor is connected to 
the RotatingForwarder
     And the "first" relationship of the RotatingForwarder processor is 
connected to the PutFile
@@ -133,7 +133,7 @@ Feature: MiNiFi can use python processors in its flows
     Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
     And a SpecialPropertyTypeChecker processor
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
     And a SSL context service is set up for the following processor: 
"SpecialPropertyTypeChecker"
 
     And the "success" relationship of the GenerateFlowFile processor is 
connected to the SpecialPropertyTypeChecker
@@ -148,7 +148,7 @@ Feature: MiNiFi can use python processors in its flows
     Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
     And a ProcessContextInterfaceChecker processor
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GenerateFlowFile processor is 
connected to the ProcessContextInterfaceChecker
     And the "myrelationship" relationship of the 
ProcessContextInterfaceChecker processor is connected to the PutFile
@@ -164,7 +164,7 @@ Feature: MiNiFi can use python processors in its flows
     And the "error.message" property of the UpdateAttribute processor is set 
to "Old error"
     And a FailureWithAttributes processor
     And a LogAttribute processor
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GenerateFlowFile processor is 
connected to the UpdateAttribute
     And the "success" relationship of the UpdateAttribute processor is 
connected to the FailureWithAttributes
@@ -180,7 +180,7 @@ Feature: MiNiFi can use python processors in its flows
     Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
     And a RelativeImporterProcessor processor
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GenerateFlowFile processor is 
connected to the RelativeImporterProcessor
     And the "success" relationship of the RelativeImporterProcessor processor 
is connected to the PutFile
@@ -194,7 +194,7 @@ Feature: MiNiFi can use python processors in its flows
     Given a CreateNothing processor
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the CreateNothing processor is connected 
to the PutFile
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
     When the MiNiFi instance starts up
     Then no files are placed in the monitored directory in 10 seconds of 
running time
     And the Minifi logs do not contain the following message: "Caught 
Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 
seconds
@@ -203,7 +203,7 @@ Feature: MiNiFi can use python processors in its flows
   Scenario: NiFi native python processor cannot specify content of failure 
result
     Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
     And a FailureWithContent processor
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GenerateFlowFile processor is 
connected to the FailureWithContent
 
@@ -215,7 +215,7 @@ Feature: MiNiFi can use python processors in its flows
   Scenario: NiFi native python processor cannot transfer to original 
relationship
     Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
     And a TransferToOriginal processor
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GenerateFlowFile processor is 
connected to the TransferToOriginal
 
@@ -234,7 +234,7 @@ Feature: MiNiFi can use python processors in its flows
     And a JsonRecordSetWriter controller service is set up with "Array" output 
grouping
     And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
     And the "Log Payload" property of the LogAttribute processor is set to 
"true"
-    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    And python virtualenv is installed on the MiNiFi agent
 
     And the "success" relationship of the GetFile processor is connected to 
the SetRecordField
     And the "success" relationship of the SetRecordField processor is 
connected to the LogAttribute
@@ -246,3 +246,13 @@ Feature: MiNiFi can use python processors in its flows
     And the Minifi logs contain the following message: '[{"name":"Zoe"}]' in 
less than 5 seconds
     And the Minifi logs contain the following message: 
'[{"group":"group1","name":"Steve"}]' in less than 5 seconds
     And the Minifi logs contain the following message: '[{}]' in less than 5 
seconds
+
+  @USE_NIFI_PYTHON_PROCESSORS
+  Scenario: MiNiFi C++ can use state manager commands in native NiFi python 
processors
+    Given a TestStateManager processor
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And the "success" relationship of the TestStateManager processor is 
connected to the LogAttribute
+    And python virtualenv is installed on the MiNiFi agent
+    When the MiNiFi instance starts up
+    Then the Minifi logs contain the following message: "key:state_key 
value:1" in less than 60 seconds
+    And the Minifi logs contain the following message: "key:state_key value:2" 
in less than 60 seconds
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index e9b45c2ab..76e91d59e 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -1336,7 +1336,7 @@ def step_impl(context):
 
 
 # Python
-@given("python is installed on the MiNiFi agent {install_mode}")
+@given("python with langchain is installed on the MiNiFi agent {install_mode}")
 def step_impl(context, install_mode):
     if install_mode == "with required python packages":
         
context.test.use_nifi_python_processors_with_system_python_packages_installed_in_minifi()
@@ -1350,6 +1350,11 @@ def step_impl(context, install_mode):
         raise Exception("Unknown python install mode.")
 
 
+@given("python virtualenv is installed on the MiNiFi agent")
+def step_impl(context):
+    context.test.use_nifi_python_processors_without_dependencies_in_minifi()
+
+
 @given("the example MiNiFi python processors are present")
 def step_impl(context):
     context.test.enable_example_minifi_python_processors()
diff --git a/docker/test/integration/minifi/processors/TestStateManager.py 
b/docker/test/integration/minifi/processors/TestStateManager.py
new file mode 100644
index 000000000..629ea51f3
--- /dev/null
+++ b/docker/test/integration/minifi/processors/TestStateManager.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class TestStateManager(Processor):
+    def __init__(self, context, schedule={'scheduling period': '1 sec'}):
+        super(TestStateManager, self).__init__(
+            context=context,
+            clazz='TestStateManager',
+            
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
+            schedule=schedule,
+            auto_terminate=[])
diff --git a/docker/test/integration/resources/python/TestStateManager.py 
b/docker/test/integration/resources/python/TestStateManager.py
new file mode 100644
index 000000000..b5d657176
--- /dev/null
+++ b/docker/test/integration/resources/python/TestStateManager.py
@@ -0,0 +1,48 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from nifiapi.componentstate import Scope
+from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
+
+
+class TestStateManager(FlowFileSource):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileSource']
+
+    class ProcessorDetails:
+        version = '0.0.1-SNAPSHOT'
+        description = '''A Python source processor that uses StateManager.'''
+        tags = ['text', 'test', 'python', 'source']
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return []
+
+    def create(self, context):
+        state_manager = context.getStateManager()
+        state = state_manager.getState(Scope.CLUSTER)
+        old_value = state.get("state_key")
+        if not old_value:
+            new_state = {'state_key': '1'}
+            state_manager.setState(new_state, Scope.CLUSTER)
+        elif old_value == '1':
+            new_state = {'state_key': '2'}
+            state_manager.replace(state, new_state, Scope.CLUSTER)
+        else:
+            state_manager.clear(Scope.CLUSTER)
+
+        return FlowFileSourceResult(relationship='success', 
attributes=state.toMap(), contents='Output FlowFile Contents')
diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md
index d99ac654b..3dfbc5d7f 100644
--- a/extensions/python/PYTHON.md
+++ b/extensions/python/PYTHON.md
@@ -172,6 +172,7 @@ Due to some differences between the NiFi and MiNiFi C++ 
processors and implement
 - The interface of the `ProcessContext` class is a bit more limited in MiNiFi 
C++ compared to NiFi. The available methods in `ProcessContext` are 
`getProperty`, `getStateManager`, `getName` and `getProperties`.
 - Success relationship is always present in all Python processors even if 
custom relationships are defined in the Python processor with the 
`getRelationships` method.
 - MiNiFi C++ uses a single embedded Python interpreter for all Python 
processors, so the Python processors share the same Python interpreter. This 
means that the Python processors cannot have different Python versions or use 
different Python packages. The Python packages are installed on the system or 
in a single virtualenv that is shared by all Python processors.
+- State manager API is available with the same interface as in NiFi, but 
MiNiFi C++ uses transactional state management, due to this when a state is 
changed in a processor trigger the state cannot be read due to the dirty read 
protection. The state can be read in the next trigger after the state is 
committed at the end of a session. Also MiNiFi C++ does not use clustering, so 
the scope parameter for the state operations is ignored as it is always local.
 
 ## Use Python processors from virtualenv
 
diff --git a/extensions/python/pythonprocessors/nifiapi/componentstate.py 
b/extensions/python/pythonprocessors/nifiapi/componentstate.py
new file mode 100644
index 000000000..f6cf7e6b7
--- /dev/null
+++ b/extensions/python/pythonprocessors/nifiapi/componentstate.py
@@ -0,0 +1,79 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from enum import Enum
+from typing import Dict
+from minifi_native import StateManager as CppFlowFile
+
+
+class Scope(Enum):
+    CLUSTER = 1
+    LOCAL = 2
+
+
+class StateMap:
+    def __init__(self, state_map: Dict[str, str]):
+        self.state_map = state_map if state_map is not None else {}
+
+    def getStateVersion(self) -> int:
+        return 1
+
+    def get(self, key) -> str:
+        if key not in self.state_map:
+            return None
+
+        return self.state_map[key]
+
+    def toMap(self) -> Dict[str, str]:
+        return self.state_map
+
+
+class StateManager:
+    """
+    Python class wrapping the StateManager CPP implementation.
+    """
+
+    def __init__(self, cpp_state_manager: CppFlowFile):
+        self.cpp_state_manager = cpp_state_manager
+
+    def setState(self, state: Dict[str, str], scope: Scope) -> bool:
+        try:
+            return self.cpp_state_manager.set(state)
+        except Exception as exception:
+            raise StateException("Set state failed") from exception
+
+    def getState(self, scope: Scope) -> StateMap:
+        try:
+            return StateMap(self.cpp_state_manager.get())
+        except Exception as exception:
+            raise StateException("Get state failed") from exception
+
+    def replace(self, old_state: StateMap, new_values: Dict[str, str], scope: 
Scope) -> bool:
+        try:
+            return self.cpp_state_manager.replace(old_state.toMap(), 
new_values)
+        except Exception as exception:
+            raise StateException("Replace state failed") from exception
+
+    def clear(self, scope: Scope):
+        try:
+            self.cpp_state_manager.clear()
+        except Exception as exception:
+            raise StateException("Clear state failed") from exception
+
+
+class StateException(Exception):
+    """
+    Exception class for any exception produced by the operations in 
StateManager.
+    """
diff --git a/extensions/python/pythonprocessors/nifiapi/properties.py 
b/extensions/python/pythonprocessors/nifiapi/properties.py
index 996a0bece..e4f71a9b4 100644
--- a/extensions/python/pythonprocessors/nifiapi/properties.py
+++ b/extensions/python/pythonprocessors/nifiapi/properties.py
@@ -15,9 +15,10 @@
 
 from enum import Enum
 from typing import List, Dict
-from minifi_native import ProcessSession, StateManager, 
timePeriodStringToMilliseconds, dataSizeStringToBytes
+from minifi_native import ProcessSession, timePeriodStringToMilliseconds, 
dataSizeStringToBytes
 from minifi_native import FlowFile as CppFlowFile
 from minifi_native import ProcessContext as CppProcessContext
+from .componentstate import StateManager
 
 
 # This is a mock for NiFi's StandardValidators class methods, that return the 
property type equivalent in MiNiFi C++ if exists
@@ -294,7 +295,7 @@ class ProcessContext:
         return PythonPropertyValue(self.cpp_context, property_name, 
property_value, expression_language_support, controller_service_definition)
 
     def getStateManager(self) -> StateManager:
-        return self.cpp_context.getStateManager()
+        return StateManager(self.cpp_context.getStateManager())
 
     def getName(self) -> str:
         return self.cpp_context.getName()
diff --git a/extensions/python/types/PyStateManager.cpp 
b/extensions/python/types/PyStateManager.cpp
index 957e8aa51..0b0993a68 100644
--- a/extensions/python/types/PyStateManager.cpp
+++ b/extensions/python/types/PyStateManager.cpp
@@ -23,6 +23,8 @@ namespace org::apache::nifi::minifi::extensions::python {
 static PyMethodDef PyStateManager_methods[] = {  // 
NOLINT(cppcoreguidelines-avoid-c-arrays)
     {"get", (PyCFunction) PyStateManager::get, METH_VARARGS, nullptr},
     {"set", (PyCFunction) PyStateManager::set, METH_VARARGS, nullptr},
+    {"clear", (PyCFunction) PyStateManager::clear, METH_VARARGS, nullptr},
+    {"replace", (PyCFunction) PyStateManager::replace, METH_VARARGS, nullptr},
     {}  /* Sentinel */
 };
 
@@ -92,6 +94,55 @@ PyObject* PyStateManager::get(PyStateManager* self, 
PyObject*) {
   }
 }
 
+PyObject* PyStateManager::clear(PyStateManager* self, PyObject* /*args*/) {
+  if (!self->state_manager_) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading state manager outside 
'on_trigger'");
+    return nullptr;
+  }
+
+  self->state_manager_->clear();
+  Py_RETURN_NONE;
+}
+
+PyObject* PyStateManager::replace(PyStateManager* self, PyObject* args) {
+  if (!self->state_manager_) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading state manager outside 
'on_trigger'");
+    return nullptr;
+  }
+  core::StateManager::State old_cpp_state;
+
+  auto old_state = BorrowedDict::fromTuple(args, 0);
+
+  auto old_state_keys = OwnedList(PyDict_Keys(old_state.get()));
+  for (size_t i = 0; i < old_state_keys.length(); ++i) {
+    BorrowedStr key{old_state_keys[i]};
+    if (auto value = old_state[key.toUtf8String()]) {
+      BorrowedStr value_str{*value};
+      old_cpp_state[key.toUtf8String()] = value_str.toUtf8String();
+    }
+  }
+
+  auto current_cpp_state = self->state_manager_->get();
+  if ((!current_cpp_state && old_state_keys.length() > 0) || 
(current_cpp_state && old_cpp_state != *current_cpp_state)) {
+    return object::returnReference(false);
+  }
+
+  core::StateManager::State new_cpp_state;
+
+  auto new_python_state = BorrowedDict::fromTuple(args, 1);
+
+  auto new_python_state_keys = OwnedList(PyDict_Keys(new_python_state.get()));
+  for (size_t i = 0; i < new_python_state_keys.length(); ++i) {
+    BorrowedStr key{new_python_state_keys[i]};
+    if (auto value = new_python_state[key.toUtf8String()]) {
+      BorrowedStr value_str{*value};
+      new_cpp_state[key.toUtf8String()] = value_str.toUtf8String();
+    }
+  }
+
+  return object::returnReference(self->state_manager_->set(new_cpp_state));
+}
+
 PyTypeObject* PyStateManager::typeObject() {
   static OwnedObject 
PyStateManagerType{PyType_FromSpec(&PyStateManagerTypeSpec)};
   return reinterpret_cast<PyTypeObject*>(PyStateManagerType.get());
diff --git a/extensions/python/types/PyStateManager.h 
b/extensions/python/types/PyStateManager.h
index 7e0c71873..b762a23da 100644
--- a/extensions/python/types/PyStateManager.h
+++ b/extensions/python/types/PyStateManager.h
@@ -35,6 +35,8 @@ struct PyStateManager {
 
   static PyObject* set(PyStateManager* self, PyObject* args);
   static PyObject* get(PyStateManager* self, PyObject* args);
+  static PyObject* clear(PyStateManager* self, PyObject* args);
+  static PyObject* replace(PyStateManager* self, PyObject* args);
 
   static PyTypeObject* typeObject();
 };


Reply via email to