This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 5777a3b8f88bdf3fbdd7f18b927b423fc350ecb5
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Fri Jun 21 14:12:16 2024 +0200

    MINIFICPP-2411 Extend NiFi Python API to support source processors
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    This closes #1830
---
 docker/test/integration/cluster/ImageStore.py      |  4 +-
 docker/test/integration/features/python.feature    | 13 ++++
 .../minifi/processors/CreateFlowFile.py            | 25 +++++++
 .../integration/resources/python/CreateFlowFile.py | 58 ++++++++++++++++
 extensions/python/PYTHON.md                        |  2 +-
 .../pythonprocessors/nifiapi/flowfilesource.py     | 77 ++++++++++++++++++++++
 .../pythonprocessors/nifiapi/flowfiletransform.py  | 45 ++-----------
 .../{flowfiletransform.py => processorbase.py}     | 62 +----------------
 extensions/python/types/PyProcessSession.cpp       | 51 +++++++++++++-
 extensions/python/types/PyProcessSession.h         |  2 +
 msi/WixWin.wsi.in                                  |  2 +
 11 files changed, 237 insertions(+), 104 deletions(-)

diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 03a5b0b94..43e2174bb 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -163,6 +163,7 @@ class ImageStore:
                 COPY RotatingForwarder.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py
                 COPY SpecialPropertyTypeChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/SpecialPropertyTypeChecker.py
                 COPY ProcessContextInterfaceChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py
+                COPY CreateFlowFile.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py
                 RUN wget {parse_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     wget {chunk_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     echo 'langchain<=0.17.0' > 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt
 && \\
@@ -184,7 +185,8 @@ class ImageStore:
 
         return self.__build_image(dockerfile, [os.path.join(self.test_dir, 
"resources", "python", "RotatingForwarder.py"),
                                                os.path.join(self.test_dir, 
"resources", "python", "SpecialPropertyTypeChecker.py"),
-                                               os.path.join(self.test_dir, 
"resources", "python", "ProcessContextInterfaceChecker.py")])
+                                               os.path.join(self.test_dir, 
"resources", "python", "ProcessContextInterfaceChecker.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "CreateFlowFile.py")])
 
     def __build_http_proxy_image(self):
         dockerfile = dedent("""\
diff --git a/docker/test/integration/features/python.feature 
b/docker/test/integration/features/python.feature
index c0761b517..7c862a233 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -94,6 +94,19 @@ Feature: MiNiFi can use python processors in its flows
       | with a pre-created virtualenv containing the required python packages |
       | using inline defined Python dependencies to install packages          |
 
+  @USE_NIFI_PYTHON_PROCESSORS
+  Scenario: MiNiFi C++ can use native NiFi source python processors
+    Given a CreateFlowFile processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And the "space" relationship of the CreateFlowFile processor is connected 
to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    And python is installed on the MiNiFi agent with a pre-created virtualenv
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "Hello World!" is placed in the monitored 
directory in less than 10 seconds
+    And the Minifi logs contain the following message: "key:filename value:" 
in less than 60 seconds
+    And the Minifi logs contain the following message: "key:type value:space" 
in less than 60 seconds
+
   @USE_NIFI_PYTHON_PROCESSORS
   Scenario: MiNiFi C++ can use custom relationships in NiFi native python 
processors
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
diff --git a/docker/test/integration/minifi/processors/CreateFlowFile.py 
b/docker/test/integration/minifi/processors/CreateFlowFile.py
new file mode 100644
index 000000000..050532981
--- /dev/null
+++ b/docker/test/integration/minifi/processors/CreateFlowFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class CreateFlowFile(Processor):
+    def __init__(self, context, schedule={'scheduling period': '30 sec'}):
+        super(CreateFlowFile, self).__init__(
+            context=context,
+            clazz='CreateFlowFile',
+            
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
+            schedule=schedule,
+            auto_terminate=[])
diff --git a/docker/test/integration/resources/python/CreateFlowFile.py 
b/docker/test/integration/resources/python/CreateFlowFile.py
new file mode 100644
index 000000000..948734982
--- /dev/null
+++ b/docker/test/integration/resources/python/CreateFlowFile.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
+from nifiapi.properties import PropertyDescriptor
+from nifiapi.relationship import Relationship
+
+
+class CreateFlowFile(FlowFileSource):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileSource']
+
+    class ProcessorDetails:
+        version = '2.0.0-snapshot'
+        description = '''Test Python source processor.'''
+        tags = ['text', 'test', 'python', 'source']
+
+    FF_CONTENTS = PropertyDescriptor(
+        name='FlowFile Contents',
+        description='''The contents of the FlowFile.''',
+        required=True,
+        default_value='Hello World!'
+    )
+
+    property_descriptors = [FF_CONTENTS]
+
+    REL_MULTILINE = Relationship(name='space', description='FlowFiles that 
contain space characters.')
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def getRelationships(self):
+        return [self.REL_MULTILINE]
+
+    def create(self, context):
+        contents = context.getProperty(self.FF_CONTENTS).getValue()
+
+        if contents is not None and isinstance(contents, str):
+            contents_str = str.encode(contents)
+            if b' ' in contents_str:
+                return FlowFileSourceResult(relationship='space', 
attributes={"type": "space"}, contents=contents_str)
+
+        return FlowFileSourceResult(relationship='success', 
attributes={"type": "non-space"}, contents=contents)
diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md
index 7d8849504..22bcfa7cd 100644
--- a/extensions/python/PYTHON.md
+++ b/extensions/python/PYTHON.md
@@ -155,7 +155,7 @@ The SentimentAnalysis processor will perform a Vader 
Sentiment Analysis. This re
 
 ## Using NiFi Python Processors
 
-MiNiFi C++ supports the use of NiFi Python processors, that are inherited from 
the FlowFileTransform base class. To use these processors, copy the Python 
processor module to the nifi_python_processors subdirectory of the python 
directory. By default, the python directory is ${minifi_root}/minifi-python. To 
see how to write NiFi Python processors, please refer to the Python Developer 
Guide under the [Apache NiFi 
documentation](https://nifi.apache.org/documentation/v2/).
+MiNiFi C++ supports the use of NiFi Python processors, that are inherited from 
the FlowFileTransform or the FlowFileSource base class. To use these 
processors, copy the Python processor module to the nifi_python_processors 
subdirectory of the python directory. By default, the python directory is 
${minifi_root}/minifi-python. To see how to write NiFi Python processors, 
please refer to the Python Developer Guide under the [Apache NiFi 
documentation](https://nifi.apache.org/documentation/v2/).
 
 In the flow configuration these Python processors can be referenced by their 
fully qualified class name, which looks like this: 
org.apache.nifi.minifi.processors.nifi_python_processors.<package_name>.<processor_name>.
 For example, the fully qualified class name of the PromptChatGPT processor 
implemented in the file nifi_python_processors/PromptChatGPT.py is 
org.apache.nifi.minifi.processors.nifi_python_processors.PromptChatGPT. If a 
processor is copied under a subdirectory, because it is [...]
 
diff --git a/extensions/python/pythonprocessors/nifiapi/flowfilesource.py 
b/extensions/python/pythonprocessors/nifiapi/flowfilesource.py
new file mode 100644
index 000000000..46510a925
--- /dev/null
+++ b/extensions/python/pythonprocessors/nifiapi/flowfilesource.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+from .properties import ProcessContext as ProcessContextProxy
+from abc import abstractmethod
+from minifi_native import ProcessContext, ProcessSession, FlowFile
+from .processorbase import ProcessorBase, WriteCallback
+
+
+class FlowFileSourceResult:
+    def __init__(self, relationship: str, attributes=None, contents=None):
+        self.relationship = relationship
+        self.attributes = attributes
+        if contents is not None and isinstance(contents, str):
+            self.contents = str.encode(contents)
+        else:
+            self.contents = contents
+
+    def getRelationship(self):
+        return self.relationship
+
+    def getContents(self):
+        return self.contents
+
+    def getAttributes(self):
+        return self.attributes
+
+
+class FlowFileSource(ProcessorBase):
+    # These will be added through the python bindings using C API
+    logger = None
+    REL_SUCCESS = None
+    REL_FAILURE = None
+    REL_ORIGINAL = None
+
+    def onTrigger(self, context: ProcessContext, session: ProcessSession):
+        context_proxy = ProcessContextProxy(context, self)
+        try:
+            result = self.create(context_proxy)
+        except Exception:
+            self.logger.error("Failed to create flow file due to 
error:\n{}".format(traceback.format_exc()))
+            return
+
+        flow_file = self.createFlowFile(session, result.getAttributes(), 
result.getContents())
+
+        if result.getRelationship() == "success":
+            session.transfer(flow_file, self.REL_SUCCESS)
+        else:
+            session.transferToCustomRelationship(flow_file, 
result.getRelationship())
+
+    def createFlowFile(self, session: ProcessSession, attributes, contents) -> 
FlowFile:
+        flow_file = session.create()
+        if attributes:
+            for key in attributes:
+                flow_file.addAttribute(key, attributes[key])
+
+        if contents:
+            session.write(flow_file, WriteCallback(contents))
+
+        return flow_file
+
+    @abstractmethod
+    def create(self, context):
+        pass
diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py 
b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
index d17facd05..94487c3bc 100644
--- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
+++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
@@ -14,21 +14,11 @@
 # limitations under the License.
 
 import traceback
-from abc import ABC, abstractmethod
-from typing import List
-from .properties import ExpressionLanguageScope, PropertyDescriptor, 
translateStandardValidatorToMiNiFiPropertype
+from abc import abstractmethod
+from minifi_native import ProcessContext, ProcessSession
+from .processorbase import ProcessorBase, WriteCallback
 from .properties import FlowFile as FlowFileProxy
 from .properties import ProcessContext as ProcessContextProxy
-from minifi_native import OutputStream, Processor, ProcessContext, 
ProcessSession
-
-
-class WriteCallback:
-    def __init__(self, content):
-        self.content = content
-
-    def process(self, output_stream: OutputStream):
-        output_stream.write(self.content)
-        return len(self.content)
 
 
 class FlowFileTransformResult:
@@ -50,37 +40,13 @@ class FlowFileTransformResult:
         return self.attributes
 
 
-class FlowFileTransform(ABC):
+class FlowFileTransform(ProcessorBase):
     # These will be added through the python bindings using C API
     logger = None
     REL_SUCCESS = None
     REL_FAILURE = None
     REL_ORIGINAL = None
 
-    def describe(self, processor: Processor):
-        if hasattr(self, 'ProcessorDetails') and 
hasattr(self.ProcessorDetails, 'description'):
-            processor.setDescription(self.ProcessorDetails.description)
-        else:
-            processor.setDescription(self.__class__.__name__)
-
-    def onInitialize(self, processor: Processor):
-        processor.setSupportsDynamicProperties()
-        for property in self.getPropertyDescriptors():
-            property_type_code = 
translateStandardValidatorToMiNiFiPropertype(property.validators)
-            expression_language_supported = True if 
property.expressionLanguageScope != ExpressionLanguageScope.NONE else False
-
-            # MiNiFi C++ does not support dependant properties, so if a 
property depends on another property, it should not be required
-            is_required = True if property.required and not 
property.dependencies else False
-            processor.addProperty(property.name, property.description, 
property.defaultValue, is_required, expression_language_supported,
-                                  property.sensitive, property_type_code, 
property.controllerServiceDefinition)
-
-    def onScheduled(self, context_proxy: ProcessContextProxy):
-        pass
-
-    def onSchedule(self, context: ProcessContext):
-        context_proxy = ProcessContextProxy(context, self)
-        self.onScheduled(context_proxy)
-
     def onTrigger(self, context: ProcessContext, session: ProcessSession):
         original_flow_file = session.get()
         if not original_flow_file:
@@ -121,6 +87,3 @@ class FlowFileTransform(ABC):
     @abstractmethod
     def transform(self, context: ProcessContextProxy, flowFile: FlowFileProxy) 
-> FlowFileTransformResult:
         pass
-
-    def getPropertyDescriptors(self) -> List[PropertyDescriptor]:
-        return []
diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py 
b/extensions/python/pythonprocessors/nifiapi/processorbase.py
similarity index 58%
copy from extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
copy to extensions/python/pythonprocessors/nifiapi/processorbase.py
index d17facd05..a343a4cb9 100644
--- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
+++ b/extensions/python/pythonprocessors/nifiapi/processorbase.py
@@ -13,11 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import traceback
 from abc import ABC, abstractmethod
 from typing import List
 from .properties import ExpressionLanguageScope, PropertyDescriptor, 
translateStandardValidatorToMiNiFiPropertype
-from .properties import FlowFile as FlowFileProxy
 from .properties import ProcessContext as ProcessContextProxy
 from minifi_native import OutputStream, Processor, ProcessContext, 
ProcessSession
 
@@ -31,26 +29,7 @@ class WriteCallback:
         return len(self.content)
 
 
-class FlowFileTransformResult:
-    def __init__(self, relationship: str, attributes=None, contents=None):
-        self.relationship = relationship
-        self.attributes = attributes
-        if contents is not None and isinstance(contents, str):
-            self.contents = str.encode(contents)
-        else:
-            self.contents = contents
-
-    def getRelationship(self):
-        return self.relationship
-
-    def getContents(self):
-        return self.contents
-
-    def getAttributes(self):
-        return self.attributes
-
-
-class FlowFileTransform(ABC):
+class ProcessorBase(ABC):
     # These will be added through the python bindings using C API
     logger = None
     REL_SUCCESS = None
@@ -81,45 +60,8 @@ class FlowFileTransform(ABC):
         context_proxy = ProcessContextProxy(context, self)
         self.onScheduled(context_proxy)
 
-    def onTrigger(self, context: ProcessContext, session: ProcessSession):
-        original_flow_file = session.get()
-        if not original_flow_file:
-            return
-
-        flow_file = session.clone(original_flow_file)
-
-        flow_file_proxy = FlowFileProxy(session, flow_file)
-        context_proxy = ProcessContextProxy(context, self)
-        try:
-            result = self.transform(context_proxy, flow_file_proxy)
-        except Exception:
-            self.logger.error("Failed to transform flow file due to 
error:\n{}".format(traceback.format_exc()))
-            session.remove(flow_file)
-            session.transfer(original_flow_file, self.REL_FAILURE)
-            return
-
-        if result.getRelationship() == "failure":
-            session.remove(flow_file)
-            session.transfer(original_flow_file, self.REL_FAILURE)
-            return
-
-        result_attributes = result.getAttributes()
-        if result_attributes is not None:
-            for attribute in result_attributes:
-                flow_file.addAttribute(attribute, result_attributes[attribute])
-
-        result_content = result.getContents()
-        if result_content is not None:
-            session.write(flow_file, WriteCallback(result_content))
-
-        if result.getRelationship() == "success":
-            session.transfer(flow_file, self.REL_SUCCESS)
-        else:
-            session.transferToCustomRelationship(flow_file, 
result.getRelationship())
-        session.transfer(original_flow_file, self.REL_ORIGINAL)
-
     @abstractmethod
-    def transform(self, context: ProcessContextProxy, flowFile: FlowFileProxy) 
-> FlowFileTransformResult:
+    def onTrigger(self, context: ProcessContext, session: ProcessSession):
         pass
 
     def getPropertyDescriptors(self) -> List[PropertyDescriptor]:
diff --git a/extensions/python/types/PyProcessSession.cpp 
b/extensions/python/types/PyProcessSession.cpp
index 1b20b4e5a..876c68bf7 100644
--- a/extensions/python/types/PyProcessSession.cpp
+++ b/extensions/python/types/PyProcessSession.cpp
@@ -132,7 +132,6 @@ void PyProcessSession::remove(const 
std::shared_ptr<core::FlowFile>& flow_file)
   if (!session_) {
     throw std::runtime_error("Access of ProcessSession after it has been 
released");
   }
-  std::shared_ptr<core::FlowFile> result;
 
   session_->remove(flow_file);
   flow_files_.erase(ranges::remove_if(flow_files_, [&flow_file](const auto& 
ff)-> bool { return ff == flow_file; }), flow_files_.end());
@@ -155,6 +154,14 @@ std::string PyProcessSession::getContentsAsString(const 
std::shared_ptr<core::Fl
   return content;
 }
 
+void PyProcessSession::putAttribute(const std::shared_ptr<core::FlowFile>& 
flow_file, std::string_view key, const std::string& value) {
+  if (!session_) {
+    throw std::runtime_error("Access of ProcessSession after it has been 
released");
+  }
+
+  session_->putAttribute(*flow_file, key, value);
+}
+
 extern "C" {
 
 static PyMethodDef PyProcessSessionObject_methods[] = {  // 
NOLINT(cppcoreguidelines-avoid-c-arrays)
@@ -167,6 +174,7 @@ static PyMethodDef PyProcessSessionObject_methods[] = {  // 
NOLINT(cppcoreguidel
     {"transferToCustomRelationship", (PyCFunction) 
PyProcessSessionObject::transferToCustomRelationship, METH_VARARGS, nullptr},
     {"remove", (PyCFunction) PyProcessSessionObject::remove, METH_VARARGS, 
nullptr},
     {"getContentsAsBytes", (PyCFunction) 
PyProcessSessionObject::getContentsAsBytes, METH_VARARGS, nullptr},
+    {"putAttribute", (PyCFunction) PyProcessSessionObject::putAttribute, 
METH_VARARGS, nullptr},
     {}  /* Sentinel */
 };
 
@@ -371,6 +379,47 @@ PyObject* 
PyProcessSessionObject::getContentsAsBytes(PyProcessSessionObject* sel
   return PyBytes_FromStringAndSize(content.c_str(), 
gsl::narrow<Py_ssize_t>(content.size()));
 }
 
+PyObject* PyProcessSessionObject::putAttribute(PyProcessSessionObject* self, 
PyObject* args) {
+  auto session = self->process_session_.lock();
+  if (!session) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading process session 
outside 'on_trigger'");
+    return nullptr;
+  }
+  PyObject* script_flow_file = nullptr;
+  const char* attribute_key = nullptr;
+  const char* attribute_value = nullptr;
+  if (!PyArg_ParseTuple(args, "O!ss", PyScriptFlowFile::typeObject(), 
&script_flow_file, &attribute_key, &attribute_value)) {
+    return nullptr;
+  }
+
+  const auto flow_file = 
reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
+  if (!flow_file) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 
'on_trigger'");
+    return nullptr;
+  }
+
+  if (!attribute_key) {
+    PyErr_SetString(PyExc_AttributeError, "Attribute key is invalid!");
+    return nullptr;
+  }
+
+  std::string attribute_key_str(attribute_key);
+  if (attribute_key_str.empty()) {
+    PyErr_SetString(PyExc_AttributeError, "Attribute key is empty!");
+    return nullptr;
+  }
+
+  if (!attribute_value) {
+    PyErr_SetString(PyExc_AttributeError, "Attribute value is invalid!");
+    return nullptr;
+  }
+
+  std::string attribute_value_str(attribute_value);
+
+  session->putAttribute(flow_file, attribute_key_str, attribute_value_str);
+  Py_RETURN_NONE;
+}
+
 PyTypeObject* PyProcessSessionObject::typeObject() {
   static OwnedObject 
PyProcessSessionObjectType{PyType_FromSpec(&PyProcessSessionObjectTypeSpec)};
   return reinterpret_cast<PyTypeObject*>(PyProcessSessionObjectType.get());
diff --git a/extensions/python/types/PyProcessSession.h 
b/extensions/python/types/PyProcessSession.h
index 14866a820..a78bb9107 100644
--- a/extensions/python/types/PyProcessSession.h
+++ b/extensions/python/types/PyProcessSession.h
@@ -38,6 +38,7 @@ class PyProcessSession {
   void write(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject 
output_stream_callback);
   void remove(const std::shared_ptr<core::FlowFile>& flow_file);
   std::string getContentsAsString(const std::shared_ptr<core::FlowFile>& 
flow_file);
+  void putAttribute(const std::shared_ptr<core::FlowFile>& flow_file, 
std::string_view key, const std::string& value);
 
  private:
   std::vector<std::shared_ptr<core::FlowFile>> flow_files_;
@@ -63,6 +64,7 @@ struct PyProcessSessionObject {
   static PyObject* transferToCustomRelationship(PyProcessSessionObject* self, 
PyObject* args);
   static PyObject* remove(PyProcessSessionObject* self, PyObject* args);
   static PyObject* getContentsAsBytes(PyProcessSessionObject* self, PyObject* 
args);
+  static PyObject* putAttribute(PyProcessSessionObject* self, PyObject* args);
 
   static PyTypeObject* typeObject();
 };
diff --git a/msi/WixWin.wsi.in b/msi/WixWin.wsi.in
index 85733f1e5..b12afc76b 100644
--- a/msi/WixWin.wsi.in
+++ b/msi/WixWin.wsi.in
@@ -395,7 +395,9 @@ ${WIX_EXTRA_COMPONENTS}
             <Directory Id="INSTALLPYTHONDIR" Name="minifi-python">
               <Directory Id="INSTALLNIFIAPIDIR" Name="nifiapi">
                 <Component Id="PythonProcessorNifiApiFiles" 
Guid="a9cb7b7b-e66d-4e32-9115-eab4aa980124">
+                  <File Id="NifiApi_processorbase" Name="processorbase.py" 
KeyPath="no" Source="pythonprocessors\nifiapi\processorbase.py"/>
                   <File Id="NifiApi_flowfiletransform" 
Name="flowfiletransform.py" KeyPath="no" 
Source="pythonprocessors\nifiapi\flowfiletransform.py"/>
+                  <File Id="NifiApi_flowfilesource" Name="flowfilesource.py" 
KeyPath="no" Source="pythonprocessors\nifiapi\flowfilesource.py"/>
                   <File Id="NifiApi_properties" Name="properties.py" 
KeyPath="no" Source="pythonprocessors\nifiapi\properties.py"/>
                   <File Id="NifiApi_relationship" Name="relationship.py" 
KeyPath="no" Source="pythonprocessors\nifiapi\relationship.py"/>
                   <File Id="NifiApi_documentation" Name="documentation.py" 
KeyPath="no" Source="pythonprocessors\nifiapi\documentation.py"/>

Reply via email to