This is an automated email from the ASF dual-hosted git repository.
martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 22c519f52 MINIFICPP-2635 Handle dynamic properties correctly in python
processors
22c519f52 is described below
commit 22c519f529ec301a016b97554831a2423a5dcc7a
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Sep 24 10:51:22 2025 +0200
MINIFICPP-2635 Handle dynamic properties correctly in python processors
- Fix dynamic property support flag setting in python processors
- Only support dynamic properties in NiFi style python processors if
`getDynamicPropertyDescriptor` is defined
- Fix `yieldResources` python binding
- Add `getDynamicProperty` and `getDynamicPropertyKeys` to minifi python API
- Only require property name and description when adding property to
processor using minifi python API
- Add docker tests for dynamic property handling in python processors
Closes #2034
Signed-off-by: Martin Zink <[email protected]>
---
PROCESSORS.md | 4 +-
docker/test/integration/cluster/ImageStore.py | 4 ++
docker/test/integration/features/environment.py | 2 +-
docker/test/integration/features/python.feature | 61 +++++++++++++---------
docker/test/integration/features/steps/steps.py | 2 +-
.../minifi/processors/LogDynamicProperties.py | 20 +++++++
.../processors/NifiStyleLogDynamicProperties.py | 25 +++++++++
.../resources/python/LogDynamicProperties.py | 35 +++++++++++++
.../python/NifiStyleLogDynamicProperties.py | 55 +++++++++++++++++++
extensions/python/ExecutePythonProcessor.h | 16 +++---
extensions/python/PYTHON.md | 21 ++++++--
extensions/python/PythonCreator.h | 2 +-
.../pythonprocessors/nifiapi/processorbase.py | 8 ++-
.../python/pythonprocessors/nifiapi/properties.py | 2 +
extensions/python/tests/PythonManifestTests.cpp | 5 ++
extensions/python/types/PyProcessContext.cpp | 52 +++++++++++++++++-
extensions/python/types/PyProcessContext.h | 2 +
extensions/python/types/PyProcessor.cpp | 47 ++++++++++-------
18 files changed, 306 insertions(+), 57 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 7baa9891b..6749be87b 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -722,7 +722,9 @@ In the list below, the names of required properties appear
in bold. Any other pr
### Description
-Executes a script given the flow file and a process session. The script is
responsible for handling the incoming flow file (transfer to SUCCESS or remove,
e.g.) as well as any flow files created by the script. If the handling is
incomplete or incorrect, the session will be rolled back. Scripts must define
an onTrigger function which accepts NiFi Context and ProcessSession objects.
Scripts are executed once when the processor is run, then the onTrigger method
is called for each incoming f [...]
+DEPRECATED. This processor should only be used internally for running NiFi and
MiNiFi C++ style python processors. Do not use this processor in your own
flows, move your python processors to the minifi-python directory instead,
where they will be parsed, and then they can be used with their filename as the
processor class in the flow configuration.
+
+This processor executes a script given the flow file and a process session.
The script is responsible for handling the incoming flow file (transfer to
SUCCESS or remove, e.g.) as well as any flow files created by the script. If
the handling is incomplete or incorrect, the session will be rolled back.
Scripts must define an onTrigger function which accepts NiFi Context and
ProcessSession objects. Scripts are executed once when the processor is run,
then the onTrigger method is called for [...]
### Properties
diff --git a/docker/test/integration/cluster/ImageStore.py
b/docker/test/integration/cluster/ImageStore.py
index 108dc7d0f..1b0802df2 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -211,6 +211,8 @@ class ImageStore:
COPY TransferToOriginal.py
{minifi_python_dir}/nifi_python_processors/TransferToOriginal.py
COPY SetRecordField.py
{minifi_python_dir}/nifi_python_processors/SetRecordField.py
COPY TestStateManager.py
{minifi_python_dir}/nifi_python_processors/TestStateManager.py
+ COPY NifiStyleLogDynamicProperties.py
{minifi_python_dir}/nifi_python_processors/NifiStyleLogDynamicProperties.py
+ COPY LogDynamicProperties.py
{minifi_python_dir}/LogDynamicProperties.py
RUN python3 -m venv {minifi_python_venv_parent}/venv
""".format(base_image='apacheminificpp:' +
MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
pip3_install_command=pip3_install_command,
@@ -234,6 +236,8 @@ class ImageStore:
build_full_python_resource_path("TransferToOriginal.py"),
build_full_python_resource_path("SetRecordField.py"),
build_full_python_resource_path("TestStateManager.py"),
+
build_full_python_resource_path("NifiStyleLogDynamicProperties.py"),
+ build_full_python_resource_path("LogDynamicProperties.py")
])
def __build_minifi_cpp_image_with_llamacpp_model(self):
diff --git a/docker/test/integration/features/environment.py
b/docker/test/integration/features/environment.py
index c6422a34c..0b485940b 100644
--- a/docker/test/integration/features/environment.py
+++ b/docker/test/integration/features/environment.py
@@ -48,7 +48,7 @@ def before_scenario(context, scenario):
logging.info("Integration test setup at
{time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
context.test = MiNiFi_integration_test(context=context,
feature_id=context.feature_id)
- if "USE_NIFI_PYTHON_PROCESSORS" in scenario.effective_tags:
+ if "USE_NIFI_PYTHON_PROCESSORS_WITH_LANGCHAIN" in scenario.effective_tags:
if not context.image_store.is_conda_available_in_minifi_image() and
context.image_store.get_minifi_image_python_version() < (3, 8, 1):
scenario.skip("NiFi Python processor tests use langchain library
which requires Python 3.8.1 or later.")
return
diff --git a/docker/test/integration/features/python.feature
b/docker/test/integration/features/python.feature
index f701e9842..966f9d81b 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -39,6 +39,19 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up
Then the Minifi logs contain the following message: "key:Python attribute
value:attributevalue" in less than 60 seconds
+ Scenario: A MiNiFi instance can handle dynamic properties through native
python processor
+ Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
+ And a LogDynamicProperties processor with the "Static Property" property
set to "static value"
+ And the "Dynamic Property" property of the LogDynamicProperties processor
is set to "dynamic value"
+ And the "success" relationship of the GenerateFlowFile processor is
connected to the LogDynamicProperties
+ And python processors without dependencies are present on the MiNiFi agent
+
+ When all instances start up
+ Then the Minifi logs contain the following message: "Static Property
value: static value" in less than 60 seconds
+ And the Minifi logs contain the following message: "Dynamic Property
value: dynamic value" in less than 60 seconds
+ And the Minifi logs contain the following message: "dynamic property key
count: 1" in less than 60 seconds
+ And the Minifi logs contain the following message: "dynamic property key:
Dynamic Property" in less than 60 seconds
+
Scenario: Native python processor can read empty input stream
Given the example MiNiFi python processors are present
And a GenerateFlowFile processor with the "File Size" property set to "0B"
@@ -67,7 +80,7 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up
Then flowfiles with these contents are placed in the monitored directory
in less than 5 seconds: "0,1,2,3,4,5"
- @USE_NIFI_PYTHON_PROCESSORS
+ @USE_NIFI_PYTHON_PROCESSORS_WITH_LANGCHAIN
Scenario Outline: MiNiFi C++ can use native NiFi python processors
Given a GetFile processor with the "Input Directory" property set to
"/tmp/input"
And a file with filename "test_file.log" and content "test_data" is
present in "/tmp/input"
@@ -94,20 +107,18 @@ Feature: MiNiFi can use python processors in its flows
| with a pre-created virtualenv containing the required python packages |
| using inline defined Python dependencies to install packages |
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ can use native NiFi source python processors
Given a CreateFlowFile processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to
"0"
And the "space" relationship of the CreateFlowFile processor is connected
to the PutFile
And the "success" relationship of the PutFile processor is connected to
the LogAttribute
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
When the MiNiFi instance starts up
Then a flowfile with the content "Hello World!" is placed in the monitored
directory in less than 10 seconds
And the Minifi logs contain the following message: "key:filename value:"
in less than 60 seconds
And the Minifi logs contain the following message: "key:type value:space"
in less than 60 seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ can use custom relationships in NiFi native python
processors
Given a GetFile processor with the "Input Directory" property set to
"/tmp/input"
And a file with filename "test_file.log" and content "test_data_one" is
present in "/tmp/input"
@@ -116,7 +127,7 @@ Feature: MiNiFi can use python processors in its flows
And a file with filename "test_file4.log" and content "test_data_four" is
present in "/tmp/input"
And a RotatingForwarder processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GetFile processor is connected to
the RotatingForwarder
And the "first" relationship of the RotatingForwarder processor is
connected to the PutFile
@@ -128,12 +139,11 @@ Feature: MiNiFi can use python processors in its flows
Then flowfiles with these contents are placed in the monitored directory
in less than 10 seconds:
"test_data_one,test_data_two,test_data_three,test_data_four"
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ can use special property types including controller
services in NiFi native python processors
Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
And a SpecialPropertyTypeChecker processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And a SSL context service is set up for the following processor:
"SpecialPropertyTypeChecker"
And the "success" relationship of the GenerateFlowFile processor is
connected to the SpecialPropertyTypeChecker
@@ -143,12 +153,11 @@ Feature: MiNiFi can use python processors in its flows
Then one flowfile with the contents "Check successful!" is placed in the
monitored directory in less than 30 seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processor's ProcessContext interface can be
used in MiNiFi C++
Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
And a ProcessContextInterfaceChecker processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is
connected to the ProcessContextInterfaceChecker
And the "myrelationship" relationship of the
ProcessContextInterfaceChecker processor is connected to the PutFile
@@ -157,14 +166,13 @@ Feature: MiNiFi can use python processors in its flows
Then one flowfile with the contents "Check successful!" is placed in the
monitored directory in less than 30 seconds
- @USE_NIFI_PYTHON_PROCESSORS
- Scenario: NiiFi native python processor can update attributes of a flow file
transferred to failure relationship
+ Scenario: NiFi native python processor can update attributes of a flow file
transferred to failure relationship
Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
And a UpdateAttribute processor with the "my.attribute" property set to
"my.value"
And the "error.message" property of the UpdateAttribute processor is set
to "Old error"
And a FailureWithAttributes processor
And a LogAttribute processor
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is
connected to the UpdateAttribute
And the "success" relationship of the UpdateAttribute processor is
connected to the FailureWithAttributes
@@ -175,12 +183,11 @@ Feature: MiNiFi can use python processors in its flows
Then the Minifi logs contain the following message: "key:error.message
value:Error" in less than 60 seconds
And the Minifi logs contain the following message: "key:my.attribute
value:my.value" in less than 10 seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processors support relative imports
Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
And a RelativeImporterProcessor processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is
connected to the RelativeImporterProcessor
And the "success" relationship of the RelativeImporterProcessor processor
is connected to the PutFile
@@ -189,21 +196,19 @@ Feature: MiNiFi can use python processors in its flows
Then one flowfile with the contents "The final result is 1990" is placed
in the monitored directory in less than 30 seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processor is allowed to be triggered without
creating any flow files
Given a CreateNothing processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the CreateNothing processor is connected
to the PutFile
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
When the MiNiFi instance starts up
Then no files are placed in the monitored directory in 10 seconds of
running time
And the Minifi logs do not contain the following message: "Caught
Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1
seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processor cannot specify content of failure
result
Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
And a FailureWithContent processor
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is
connected to the FailureWithContent
@@ -211,11 +216,10 @@ Feature: MiNiFi can use python processors in its flows
Then the Minifi logs contain the following message: "'failure'
relationship should not have content, the original flow file will be
transferred automatically in this case." in less than 60 seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: NiFi native python processor cannot transfer to original
relationship
Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
And a TransferToOriginal processor
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is
connected to the TransferToOriginal
@@ -223,7 +227,6 @@ Feature: MiNiFi can use python processors in its flows
Then the Minifi logs contain the following message: "Result relationship
cannot be 'original', it is reserved for the original flow file, and
transferred automatically in non-failure cases." in less than 60 seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ supports RecordTransform native python processors
Given a GetFile processor with the "Input Directory" property set to
"/tmp/input"
And a file with the content '{"group": "group1", "name":
"John"}\n{"group": "group1", "name": "Jane"}\n{"group": "group2", "name":
"Kyle"}\n{"name": "Zoe"}' is present in '/tmp/input'
@@ -234,7 +237,7 @@ Feature: MiNiFi can use python processors in its flows
And a JsonRecordSetWriter controller service is set up with "Array" output
grouping
And a LogAttribute processor with the "FlowFiles To Log" property set to
"0"
And the "Log Payload" property of the LogAttribute processor is set to
"true"
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GetFile processor is connected to
the SetRecordField
And the "success" relationship of the SetRecordField processor is
connected to the LogAttribute
@@ -247,12 +250,22 @@ Feature: MiNiFi can use python processors in its flows
And the Minifi logs contain the following message:
'[{"group":"group1","name":"Steve"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{}]' in less than 5
seconds
- @USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ can use state manager commands in native NiFi python
processors
Given a TestStateManager processor
And a LogAttribute processor with the "FlowFiles To Log" property set to
"0"
And the "success" relationship of the TestStateManager processor is
connected to the LogAttribute
- And python virtualenv is installed on the MiNiFi agent
+ And python processors without dependencies are present on the MiNiFi agent
When the MiNiFi instance starts up
Then the Minifi logs contain the following message: "key:state_key
value:1" in less than 60 seconds
And the Minifi logs contain the following message: "key:state_key value:2"
in less than 60 seconds
+
+ Scenario: MiNiFi C++ can use dynamic properties in native NiFi python
processors
+ Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
+ And a NifiStyleLogDynamicProperties processor with the "Static Property"
property set to "static value"
+ And the "Dynamic Property" property of the NifiStyleLogDynamicProperties
processor is set to "dynamic value"
+ And the "success" relationship of the GenerateFlowFile processor is
connected to the NifiStyleLogDynamicProperties
+ And python processors without dependencies are present on the MiNiFi agent
+
+ When all instances start up
+ Then the Minifi logs contain the following message: "Static Property
value: static value" in less than 60 seconds
+ And the Minifi logs contain the following message: "Dynamic Property
value: dynamic value" in less than 60 seconds
diff --git a/docker/test/integration/features/steps/steps.py
b/docker/test/integration/features/steps/steps.py
index 7111f317c..e3ed90946 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -1235,7 +1235,7 @@ def step_impl(context, install_mode):
raise Exception("Unknown python install mode.")
-@given("python virtualenv is installed on the MiNiFi agent")
+@given("python processors without dependencies are present on the MiNiFi
agent")
def step_impl(context):
context.test.use_nifi_python_processors_without_dependencies_in_minifi()
diff --git a/docker/test/integration/minifi/processors/LogDynamicProperties.py
b/docker/test/integration/minifi/processors/LogDynamicProperties.py
new file mode 100644
index 000000000..359b774af
--- /dev/null
+++ b/docker/test/integration/minifi/processors/LogDynamicProperties.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class LogDynamicProperties(Processor):
+ def __init__(self, context, schedule={'scheduling period': '1 sec'}):
+ super(LogDynamicProperties, self).__init__(context=context,
clazz='LogDynamicProperties',
class_prefix='org.apache.nifi.minifi.processors.', schedule=schedule)
diff --git
a/docker/test/integration/minifi/processors/NifiStyleLogDynamicProperties.py
b/docker/test/integration/minifi/processors/NifiStyleLogDynamicProperties.py
new file mode 100644
index 000000000..8b5fd7780
--- /dev/null
+++ b/docker/test/integration/minifi/processors/NifiStyleLogDynamicProperties.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class NifiStyleLogDynamicProperties(Processor):
+ def __init__(self, context, schedule={'scheduling period': '1 sec'}):
+ super(NifiStyleLogDynamicProperties, self).__init__(
+ context=context,
+ clazz='NifiStyleLogDynamicProperties',
+
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
+ schedule=schedule,
+ auto_terminate=[])
diff --git a/docker/test/integration/resources/python/LogDynamicProperties.py
b/docker/test/integration/resources/python/LogDynamicProperties.py
new file mode 100644
index 000000000..727e38363
--- /dev/null
+++ b/docker/test/integration/resources/python/LogDynamicProperties.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+def describe(processor):
+ processor.setDescription("Test description.")
+
+
+def onInitialize(processor):
+ processor.setSupportsDynamicProperties()
+ processor.addProperty("Static Property", "Static Property")
+
+
+def onTrigger(context, session):
+ flow_file = session.get()
+ property_value = context.getProperty("Static Property")
+ log.info("Static Property value: {}".format(property_value))
+ dyn_property_value = context.getDynamicProperty("Dynamic Property")
+ log.info("Dynamic Property value: {}".format(dyn_property_value))
+ keys = context.getDynamicPropertyKeys()
+ log.info("dynamic property key count: {}".format(len(keys)))
+ for dynamic_property_key in keys:
+ log.info("dynamic property key: {}".format(dynamic_property_key))
+ session.transfer(flow_file, REL_SUCCESS)
diff --git
a/docker/test/integration/resources/python/NifiStyleLogDynamicProperties.py
b/docker/test/integration/resources/python/NifiStyleLogDynamicProperties.py
new file mode 100644
index 000000000..dc989f6b0
--- /dev/null
+++ b/docker/test/integration/resources/python/NifiStyleLogDynamicProperties.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor,
StandardValidators
+
+
+class NifiStyleLogDynamicProperties(FlowFileTransform):
+
+ class Java:
+ implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+ class ProcessorDetails:
+ version = '1.2.3'
+ description = "Test processor"
+ dependencies = []
+
+ STATIC_PROPERTY = PropertyDescriptor(
+ name="Static Property",
+ description="A dummy static property",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+ )
+
+ def __init__(self, **kwargs):
+ pass
+
+ def getPropertyDescriptors(self):
+ return [self.STATIC_PROPERTY]
+
+ def getDynamicPropertyDescriptor(self, propertyname):
+ return PropertyDescriptor(name=propertyname,
+ description="A user-defined property",
+ dynamic=True)
+
+ def transform(self, context, flow_file):
+ property_value = context.getProperty("Static Property")
+ self.logger.info("Static Property value:
{}".format(property_value.getValue()))
+ dyn_property_value = context.getProperty("Dynamic Property")
+ self.logger.info("Dynamic Property value:
{}".format(dyn_property_value.getValue()))
+
+ return FlowFileTransformResult('success', contents="content")
diff --git a/extensions/python/ExecutePythonProcessor.h
b/extensions/python/ExecutePythonProcessor.h
index 9fbe22f4d..ae9042428 100644
--- a/extensions/python/ExecutePythonProcessor.h
+++ b/extensions/python/ExecutePythonProcessor.h
@@ -48,7 +48,10 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
python_logger_ =
core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName(),
metadata.uuid);
}
- EXTENSIONAPI static constexpr const char* Description = "Executes a script
given the flow file and a process session. "
+ EXTENSIONAPI static constexpr const char* Description = "DEPRECATED. This
processor should only be used internally for running NiFi and MiNiFi C++ style
python processors. "
+ "Do not use this processor in your own flows, move your python
processors to the minifi-python directory instead, where they will be parsed, "
+ "and then they can be used with their filename as the processor class in
the flow configuration.\n\n"
+ "This processor executes a script given the flow file and a process
session. "
"The script is responsible for handling the incoming flow file (transfer
to SUCCESS or remove, e.g.) as well as "
"any flow files created by the script. If the handling is incomplete or
incorrect, the session will be rolled back. Scripts must define an onTrigger
function which accepts NiFi Context "
"and ProcessSession objects. Scripts are executed once when the
processor is run, then the onTrigger method is called for each incoming
flowfile. This enables scripts to keep state "
@@ -86,7 +89,12 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_ALLOWED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
- ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ bool supportsDynamicProperties() const override { return python_dynamic_; }
+ bool supportsDynamicRelationships() const override { return
SupportsDynamicRelationships; }
+ minifi::core::annotation::Input getInputRequirement() const override {
return InputRequirement; }
+ bool isSingleThreaded() const override { return IsSingleThreaded; }
+ ADD_GET_PROCESSOR_NAME
void initializeScript();
void initialize() override;
@@ -105,10 +113,6 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
return python_properties_;
}
- bool getPythonSupportDynamicProperties() {
- return python_dynamic_;
- }
-
void setDescription(const std::string &description) {
description_ = description;
}
diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md
index 7152c23b0..b5b8ae658 100644
--- a/extensions/python/PYTHON.md
+++ b/extensions/python/PYTHON.md
@@ -16,7 +16,7 @@
# Apache NiFi - MiNiFi - Python Processors Readme
-This readme defines the configuration parameters to use ExecutePythonProcessor
to run native python processors.
+This readme defines the configuration parameters to use ExecutePythonProcessor
to run native python processors. ExecutePythonProcessor is not used explicitly
in the flow, instead the python processors are referenced directly by their
filename in the flow configuration, ExecutePythonProcessor only manages the
execution of these processors under the hood.
## Table of Contents
- [Requirements](#requirements)
@@ -76,9 +76,24 @@ export
LD_LIBRARY_PATH="${PYENV_ROOT}/versions/${PY_VERSION}/lib${LD_LIBRARY_PAT
## Description
Python native processors can be updated at any time by simply adding a new
processor to the directory defined in
-the configuration options. The processor name, when provided to MiNiFi C++ and
any C2 manifest will be that
+the `minifi.properties` file in the `nifi.python.processor.dir` property. The
processor name, when provided to MiNiFi C++ and any C2 manifest will be that
of the name of the python script. For example, "AttributePrinter.py" will be
named and referenced in the flow
-as "org.apache.nifi.minifi.processors.AttributePrinter"
+as "org.apache.nifi.minifi.processors.AttributePrinter" and would look
something like this in the flow configuration:
+
+```yaml
+- name: My AttributePrinter
+ id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+ class: org.apache.nifi.minifi.processors.AttributePrinter
+ scheduling strategy: EVENT_DRIVEN
+ auto-terminated relationships list:
+ - failure
+ - success
+ - original
+ Properties:
+ Attributes To Print: filename,path
+```
+
+Every python processor has a success, failure and original relationship where
the original relationship is auto-terminated by default.
Methods that are enabled within the processor are describe, onSchedule,
onInitialize, and onTrigger.
diff --git a/extensions/python/PythonCreator.h
b/extensions/python/PythonCreator.h
index 337cc666e..65c8290bc 100644
--- a/extensions/python/PythonCreator.h
+++ b/extensions/python/PythonCreator.h
@@ -174,7 +174,7 @@ class PythonCreator : public
minifi::core::CoreComponentImpl {
.description_ = processor->getDescription(),
.class_properties_ = processor->getPythonProperties(),
.class_relationships_ = processor->getPythonRelationships(),
- .supports_dynamic_properties_ =
processor->getPythonSupportDynamicProperties(),
+ .supports_dynamic_properties_ = processor->supportsDynamicProperties(),
.inputRequirement_ = toString(processor->getInputRequirement()),
.isSingleThreaded_ = processor->isSingleThreaded()};
diff --git a/extensions/python/pythonprocessors/nifiapi/processorbase.py
b/extensions/python/pythonprocessors/nifiapi/processorbase.py
index 77f1b7114..7a884aeed 100644
--- a/extensions/python/pythonprocessors/nifiapi/processorbase.py
+++ b/extensions/python/pythonprocessors/nifiapi/processorbase.py
@@ -46,7 +46,13 @@ class ProcessorBase(ABC):
processor.setVersion(self.ProcessorDetails.version)
def onInitialize(self, processor: Processor):
- processor.setSupportsDynamicProperties()
+ get_dynamic_property_descriptor_attr = getattr(self,
'getDynamicPropertyDescriptor', None)
+ if get_dynamic_property_descriptor_attr and
callable(get_dynamic_property_descriptor_attr):
+ processor.setSupportsDynamicProperties()
+ self.supports_dynamic_properties = True
+ else:
+ self.supports_dynamic_properties = False
+
for property in self.getPropertyDescriptors():
property_type_code =
translateStandardValidatorToMiNiFiPropertype(property.validators)
expression_language_supported = True if
property.expressionLanguageScope != ExpressionLanguageScope.NONE else False
diff --git a/extensions/python/pythonprocessors/nifiapi/properties.py
b/extensions/python/pythonprocessors/nifiapi/properties.py
index e4f71a9b4..a0097f446 100644
--- a/extensions/python/pythonprocessors/nifiapi/properties.py
+++ b/extensions/python/pythonprocessors/nifiapi/properties.py
@@ -292,6 +292,8 @@ class ProcessContext:
expression_language_support = descriptor.expressionLanguageScope
!= ExpressionLanguageScope.NONE
controller_service_definition =
descriptor.controllerServiceDefinition
property_value = self.cpp_context.getProperty(property_name)
+ if not property_value and self.processor.supports_dynamic_properties:
+ property_value = self.cpp_context.getDynamicProperty(property_name)
return PythonPropertyValue(self.cpp_context, property_name,
property_value, expression_language_support, controller_service_definition)
def getStateManager(self) -> StateManager:
diff --git a/extensions/python/tests/PythonManifestTests.cpp
b/extensions/python/tests/PythonManifestTests.cpp
index 889ff31ab..1afbfb751 100644
--- a/extensions/python/tests/PythonManifestTests.cpp
+++ b/extensions/python/tests/PythonManifestTests.cpp
@@ -111,6 +111,11 @@ class MyPyProc3(FlowFileTransform):
def getPropertyDescriptors(self):
return [self.COLOR, self.MOOD]
+ def getDynamicPropertyDescriptor(self, propertyname):
+ return PropertyDescriptor(name=propertyname,
+ description="A user-defined property",
+ dynamic=True)
+
def transform(self, context, flow_file):
color = context.getProperty(self.COLOR).getValue()
mood =
context.getProperty(self.MOOD).evaluateAttributeExpressions(flowfile).getValue()
or "OK"
diff --git a/extensions/python/types/PyProcessContext.cpp
b/extensions/python/types/PyProcessContext.cpp
index 4d9e9af0d..d34de6603 100644
--- a/extensions/python/types/PyProcessContext.cpp
+++ b/extensions/python/types/PyProcessContext.cpp
@@ -30,11 +30,13 @@ namespace org::apache::nifi::minifi::extensions::python {
static PyMethodDef PyProcessContext_methods[] = { //
NOLINT(cppcoreguidelines-avoid-c-arrays)
{"getProperty", (PyCFunction) PyProcessContext::getProperty, METH_VARARGS,
nullptr},
+ {"getDynamicProperty", (PyCFunction) PyProcessContext::getDynamicProperty,
METH_VARARGS, nullptr},
+ {"getDynamicPropertyKeys", (PyCFunction)
PyProcessContext::getDynamicPropertyKeys, METH_VARARGS, nullptr},
{"getStateManager", (PyCFunction) PyProcessContext::getStateManager,
METH_VARARGS, nullptr},
{"getControllerService", (PyCFunction)
PyProcessContext::getControllerService, METH_VARARGS, nullptr},
{"getName", (PyCFunction) PyProcessContext::getName, METH_VARARGS,
nullptr},
{"getProperties", (PyCFunction) PyProcessContext::getProperties,
METH_VARARGS, nullptr},
- {"yieldResources", (PyCFunction) PyProcessContext::getProperties,
METH_VARARGS, nullptr},
+ {"yieldResources", (PyCFunction) PyProcessContext::yieldResources,
METH_VARARGS, nullptr},
{} /* Sentinel */
};
@@ -100,6 +102,54 @@ PyObject* PyProcessContext::getProperty(PyProcessContext*
self, PyObject* args)
Py_RETURN_NONE;
}
+PyObject* PyProcessContext::getDynamicProperty(PyProcessContext* self,
PyObject* args) {
+ auto context = self->process_context_;
+ if (!context) {
+ PyErr_SetString(PyExc_AttributeError, "tried reading process context
outside 'on_trigger'");
+ return nullptr;
+ }
+
+ const char* property_name = nullptr;
+ PyObject* script_flow_file = nullptr;
+ if (!PyArg_ParseTuple(args, "s|O", &property_name, &script_flow_file)) {
+ return nullptr;
+ }
+
+ if (!script_flow_file) {
+ if (const auto property_value = context->getDynamicProperty(property_name,
nullptr)) {
+ return object::returnReference(*property_value);
+ }
+ Py_RETURN_NONE;
+ }
+ auto py_flow = reinterpret_cast<PyScriptFlowFile*>(script_flow_file);
+ const auto flow_file = py_flow->script_flow_file_.lock();
+ if (!flow_file) {
+ PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside
'on_trigger'");
+ return nullptr;
+ }
+
+ if (const auto property_value = context->getDynamicProperty(property_name,
flow_file.get())) {
+ return object::returnReference(*property_value);
+ }
+ Py_RETURN_NONE;
+}
+
+PyObject* PyProcessContext::getDynamicPropertyKeys(PyProcessContext* self,
PyObject*) {
+ auto context = self->process_context_;
+ if (!context) {
+ PyErr_SetString(PyExc_AttributeError, "tried reading process context
outside 'on_trigger'");
+ return nullptr;
+ }
+
+ auto property_keys = context->getDynamicPropertyKeys();
+ auto py_properties = OwnedList::create();
+ for (const auto& property_name : property_keys) {
+ py_properties.append(property_name);
+ }
+
+ return object::returnReference(py_properties);
+}
+
PyObject* PyProcessContext::getStateManager(PyProcessContext* self, PyObject*)
{
auto context = self->process_context_;
if (!context) {
diff --git a/extensions/python/types/PyProcessContext.h
b/extensions/python/types/PyProcessContext.h
index faed8c021..82ad13abe 100644
--- a/extensions/python/types/PyProcessContext.h
+++ b/extensions/python/types/PyProcessContext.h
@@ -37,6 +37,8 @@ struct PyProcessContext {
static int init(PyProcessContext* self, PyObject* args, PyObject* kwds);
static PyObject* getProperty(PyProcessContext* self, PyObject* args);
+ static PyObject* getDynamicProperty(PyProcessContext* self, PyObject* args);
+ static PyObject* getDynamicPropertyKeys(PyProcessContext* self, PyObject*
args);
static PyObject* getStateManager(PyProcessContext* self, PyObject* args);
static PyObject* getControllerService(PyProcessContext* self, PyObject*
args);
static PyObject* getName(PyProcessContext* self, PyObject* args);
diff --git a/extensions/python/types/PyProcessor.cpp
b/extensions/python/types/PyProcessor.cpp
index 11eda4713..d39b9c3ed 100644
--- a/extensions/python/types/PyProcessor.cpp
+++ b/extensions/python/types/PyProcessor.cpp
@@ -126,7 +126,7 @@ PyObject* PyProcessor::addProperty(PyProcessor* self,
PyObject* args) {
return nullptr;
}
- static constexpr Py_ssize_t ExpectedNumArgs = 9;
+ static constexpr Py_ssize_t ExpectedNumArgs = 2;
auto arg_size = PyTuple_Size(args);
if (arg_size < ExpectedNumArgs) {
PyErr_SetString(PyExc_AttributeError, fmt::format("addProperty was called
with too few arguments: need {}, got {}", ExpectedNumArgs, arg_size).c_str());
@@ -136,44 +136,55 @@ PyObject* PyProcessor::addProperty(PyProcessor* self,
PyObject* args) {
BorrowedStr name = BorrowedStr::fromTuple(args, 0);
BorrowedStr description = BorrowedStr::fromTuple(args, 1);
std::optional<std::string> default_value;
- auto default_value_pystr = BorrowedStr::fromTuple(args, 2);
- if (default_value_pystr.get() && default_value_pystr.get() != Py_None) {
- default_value = default_value_pystr.toUtf8String();
+ if (arg_size > 2) {
+ auto default_value_pystr = BorrowedStr::fromTuple(args, 2);
+ if (default_value_pystr.get() && default_value_pystr.get() != Py_None) {
+ default_value = default_value_pystr.toUtf8String();
+ }
}
bool is_required = false;
bool supports_expression_language = false;
bool sensitive = false;
- try {
+ if (arg_size > 3) {
is_required = getBoolFromTuple(args, 3);
+ }
+
+ if (arg_size > 4) {
supports_expression_language = getBoolFromTuple(args, 4);
+ }
+ if (arg_size > 5) {
sensitive = getBoolFromTuple(args, 5);
- } catch (const PyException&) {
- return nullptr;
}
std::optional<int64_t> validator_value;
- auto validator_value_pyint = BorrowedLong::fromTuple(args, 6);
- if (validator_value_pyint.get() && validator_value_pyint.get() != Py_None) {
- validator_value = validator_value_pyint.asInt64();
+ if (arg_size > 6) {
+ auto validator_value_pyint = BorrowedLong::fromTuple(args, 6);
+ if (validator_value_pyint.get() && validator_value_pyint.get() != Py_None)
{
+ validator_value = validator_value_pyint.asInt64();
+ }
}
std::vector<std::string> allowable_values_str;
- auto allowable_values_pylist = BorrowedList::fromTuple(args, 7);
- if (allowable_values_pylist.get() && allowable_values_pylist.get() !=
Py_None) {
- for (size_t i = 0; i < allowable_values_pylist.length(); ++i) {
- auto value = BorrowedStr{allowable_values_pylist[i]};
- allowable_values_str.push_back(value.toUtf8String());
+ if (arg_size > 7) {
+ auto allowable_values_pylist = BorrowedList::fromTuple(args, 7);
+ if (allowable_values_pylist.get() && allowable_values_pylist.get() !=
Py_None) {
+ for (size_t i = 0; i < allowable_values_pylist.length(); ++i) {
+ auto value = BorrowedStr{allowable_values_pylist[i]};
+ allowable_values_str.push_back(value.toUtf8String());
+ }
}
}
std::vector<std::string_view> allowable_values(begin(allowable_values_str),
end(allowable_values_str));
std::optional<std::string> controller_service_type_name;
- auto controller_service_type_name_pystr = BorrowedStr::fromTuple(args, 8);
- if (controller_service_type_name_pystr.get() &&
controller_service_type_name_pystr.get() != Py_None) {
- controller_service_type_name =
controller_service_type_name_pystr.toUtf8String();
+ if (arg_size > 8) {
+ auto controller_service_type_name_pystr = BorrowedStr::fromTuple(args, 8);
+ if (controller_service_type_name_pystr.get() &&
controller_service_type_name_pystr.get() != Py_None) {
+ controller_service_type_name =
controller_service_type_name_pystr.toUtf8String();
+ }
}
processor->addProperty(name.toUtf8String(), description.toUtf8String(),
default_value, is_required, supports_expression_language, sensitive,