This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 26600b79b5 NIFI-13673 Add testing and documentation for dependent
properties in Python processors (#10388)
26600b79b5 is described below
commit 26600b79b5ce5be9f900fd69eebe7709c60f8d17
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Oct 17 21:06:41 2025 +0200
NIFI-13673 Add testing and documentation for dependent properties in Python
processors (#10388)
Signed-off-by: David Handermann <[email protected]>
---
.../src/main/asciidoc/python-developer-guide.adoc | 43 +++++++
.../PythonControllerInteractionIT.java | 29 +++++
.../resources/extensions/ConditionalProcessor.py | 94 +++++++++++++++
.../src/main/python/src/nifiapi/properties.py | 131 ++++++++++++++++++++-
4 files changed, 292 insertions(+), 5 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
index ec08d7f866..f927ff8740 100644
--- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
@@ -389,6 +389,49 @@ def getDynamicPropertyDescriptor(self, propertyname):
If this method is not implemented and a user adds a property other than those
that are explicitly supported, the Processor will become
invalid. Of course, we might also specify explicit validators that can be
used, etc.
+==== Dependent Properties
+
+Python-based processors can express property dependencies in the same way as
Java processors. The
+`nifiapi.properties.PropertyDependency` class mirrors the Java builder's
`dependsOn(...)` method and is exposed via the
+`dependencies` keyword argument on `PropertyDescriptor`.
+
+NiFi's discovery logic inspects the processor source code using Python's
abstract syntax tree. When declaring a
+dependency, the referenced descriptor must therefore appear as a simple name,
not as an attribute such as
+`self.output_mode`. The common pattern is to assign the descriptor to a local
variable, reference that variable inside
+`PropertyDependency`, and only then assign the descriptor to an instance field
for later reuse.
+
+----
+from nifiapi.properties import PropertyDescriptor, PropertyDependency
+
+output_mode = PropertyDescriptor(
+ name='Output Mode',
+ allowable_values=['text', 'json'],
+ default_value='text',
+ required=True,
+)
+
+uppercase = PropertyDescriptor(
+ name='Uppercase',
+ allowable_values=['true', 'false'],
+ default_value='false',
+ dependencies=[PropertyDependency(output_mode, 'text')],
+)
+
+json_field = PropertyDescriptor(
+ name='JSON Field Name',
+ required=True,
+ dependencies=[PropertyDependency(output_mode, 'json')],
+)
+
+# Optionally assign to instance attributes for reuse
+self.output_mode = output_mode
+self.uppercase = uppercase
+self.json_field = json_field
+----
+
+At runtime the NiFi UI hides or disables dependent properties until the
prerequisite property is configured with one of the specified
+values, and the framework applies the same rules during validation.
+
==== Referencing Controller Services
Python processors can reference existing Java controller services using the
same mechanism as Java processors. Set the
diff --git
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 1e83deda2b..5969992afc 100644
---
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -775,6 +775,35 @@ public class PythonControllerInteractionIT {
output.assertContentEquals(expectedContent);
}
+ @Test
+ public void testConditionalProcessorWithDependencies() {
+ final TestRunner runner =
createFlowFileTransform("ConditionalProcessor");
+
+ // Text mode with uppercase dependency
+ runner.setProperty("Output Mode", "text");
+ runner.setProperty("Uppercase", "true");
+ runner.setProperty("Payload Text", "${filename}");
+ runner.enqueue("body", Map.of("filename", "nifi"));
+ runner.run();
+
+ runner.assertTransferCount("success", 1);
+ MockFlowFile textOutput =
runner.getFlowFilesForRelationship("success").getFirst();
+ textOutput.assertContentEquals("NIFI");
+ textOutput.assertAttributeEquals("output.mode", "text");
+
+ // Reset and exercise JSON branch (requires JSON Field Name dependency)
+ runner.clearTransferState();
+ runner.setProperty("Output Mode", "json");
+ runner.setProperty("JSON Field Name", "msg");
+ runner.enqueue("body", Map.of("filename", "payload"));
+ runner.run();
+
+ runner.assertTransferCount("success", 1);
+ MockFlowFile jsonOutput =
runner.getFlowFilesForRelationship("success").getFirst();
+ jsonOutput.assertContentEquals("{\"msg\": \"payload\"}");
+ jsonOutput.assertAttributeEquals("output.mode", "json");
+ }
+
private TestRunner createStateManagerTesterProcessor(String methodToTest) {
final TestRunner runner = createProcessor("TestStateManager");
runner.setProperty("StateManager Method To Test", methodToTest);
diff --git
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/ConditionalProcessor.py
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/ConditionalProcessor.py
new file mode 100644
index 0000000000..775d772ac8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/ConditionalProcessor.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+from nifiapi.properties import (
+ PropertyDescriptor,
+ PropertyDependency,
+ StandardValidators,
+ ExpressionLanguageScope,
+)
+
+
+class ConditionalProcessor(FlowFileTransform):
+ class Java:
+ implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+ class ProcessorDetails:
+ version = '0.0.1-SNAPSHOT'
+ description = 'Demonstrates property dependencies by switching between
text and JSON output modes.'
+ tags = ['dependency', 'demo', 'python']
+
+ def __init__(self, **kwargs):
+ super().__init__()
+
+ output_mode = PropertyDescriptor(
+ name='Output Mode',
+ description='Determines the format of the emitted FlowFile.',
+ allowable_values=['text', 'json'],
+ default_value='text',
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+ )
+
+ uppercase = PropertyDescriptor(
+ name='Uppercase',
+ description='If set true the text payload is uppercased (text mode
only).',
+ allowable_values=['true', 'false'],
+ default_value='false',
+ dependencies=[PropertyDependency(output_mode, 'text')],
+ validators=[StandardValidators.BOOLEAN_VALIDATOR]
+ )
+
+ json_field = PropertyDescriptor(
+ name='JSON Field Name',
+ description='Field name inserted into the JSON payload (json mode
only).',
+ required=True,
+ default_value='message',
+ dependencies=[PropertyDependency(output_mode, 'json')],
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+ )
+
+ payload_text = PropertyDescriptor(
+ name='Payload Text',
+ description='Message written to the payload. Supports Expression
Language.',
+ default_value='Hello from NiFi!',
+
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ validators=[StandardValidators.ALWAYS_VALID]
+ )
+
+ self.output_mode = output_mode
+ self.uppercase = uppercase
+ self.json_field = json_field
+ self.payload_text = payload_text
+
+ self.descriptors = [output_mode, uppercase, json_field, payload_text]
+
+ def getPropertyDescriptors(self):
+ return self.descriptors
+
+ def transform(self, context, flowfile):
+ message =
context.getProperty(self.payload_text.name).evaluateAttributeExpressions(flowfile).getValue()
+ mode = context.getProperty(self.output_mode.name).getValue()
+
+ if mode == 'json':
+ field_name = context.getProperty(self.json_field.name).getValue()
+ contents = f'{{"{field_name}": "{message}"}}'
+ else:
+ uppercase =
context.getProperty(self.uppercase.name).getValue().lower() == 'true'
+ contents = message.upper() if uppercase else message
+
+ attributes = {'output.mode': mode}
+ return FlowFileTransformResult(relationship='success',
attributes=attributes, contents=contents)
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
b/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
index 077aaf49b3..2b392d24c9 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-py4j-framework-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
@@ -14,9 +14,12 @@
# limitations under the License.
from enum import Enum
+import re
+
+from py4j.protocol import Py4JError
+
from nifiapi.componentstate import StateManager
from nifiapi.__jvm__ import JvmHolder
-import re
EMPTY_STRING_ARRAY =
JvmHolder.gateway.new_array(JvmHolder.jvm.java.lang.String, 0)
EMPTY_ALLOWABLE_VALUE_ARRAY =
JvmHolder.gateway.new_array(JvmHolder.jvm.org.apache.nifi.components.AllowableValue,
0)
@@ -251,6 +254,8 @@ class PropertyDescriptor:
return builder.build()
+
+
def __get_allowable_values(self, gateway):
if self.allowableValues is None:
return None
@@ -293,6 +298,106 @@ class PropertyDescriptor:
builder.identifiesExternalResource(cardinality, types[0], types[1:])
+def _collect_descriptor_metadata(java_context):
+ descriptor_list = []
+ descriptor_lookup = {}
+ value_lookup = {}
+
+ try:
+ java_properties = java_context.getProperties()
+ except AttributeError:
+ java_properties = None
+
+ if java_properties is not None:
+ for entry in java_properties.entrySet():
+ descriptor = entry.getKey()
+ name = descriptor.getName()
+ if name not in descriptor_lookup:
+ descriptor_list.append(descriptor)
+ descriptor_lookup[name] = descriptor
+
+ configured_value = entry.getValue()
+ value_lookup[name] = configured_value if configured_value is not
None else descriptor.getDefaultValue()
+
+ try:
+ supported_descriptor_supplier = getattr(java_context,
'getSupportedPropertyDescriptors')
+ except AttributeError:
+ supported_descriptor_supplier = None
+
+ supported_descriptors = None
+ if callable(supported_descriptor_supplier):
+ try:
+ supported_descriptors = supported_descriptor_supplier()
+ except Py4JError:
+ supported_descriptors = None
+
+ if supported_descriptors is not None:
+ for descriptor in supported_descriptors:
+ name = descriptor.getName()
+ if name not in descriptor_lookup:
+ descriptor_list.append(descriptor)
+ descriptor_lookup[name] = descriptor
+ if name not in value_lookup:
+ value_lookup[name] = descriptor.getDefaultValue()
+
+ return descriptor_list, descriptor_lookup, value_lookup
+
+
+def _dependent_values_iter(dependency):
+ dependent_values = dependency.getDependentValues()
+ if dependent_values is None:
+ return None
+ return [value for value in dependent_values]
+
+
+def _dependencies_satisfied(descriptor, descriptor_lookup, value_lookup,
cache, visiting):
+ name = descriptor.getName()
+ if name in cache:
+ return cache[name]
+
+ if name in visiting:
+ # Circular dependency detected; treat as unsatisfied to avoid infinite
recursion
+ cache[name] = False
+ return False
+
+ dependencies = descriptor.getDependencies()
+ if dependencies is None or dependencies.isEmpty():
+ cache[name] = True
+ return True
+
+ visiting.add(name)
+
+ for dependency in dependencies:
+ dependency_name = dependency.getPropertyName()
+ dependency_descriptor = descriptor_lookup.get(dependency_name)
+ if dependency_descriptor is None:
+ cache[name] = False
+ visiting.remove(name)
+ return False
+
+ if not _dependencies_satisfied(dependency_descriptor,
descriptor_lookup, value_lookup, cache, visiting):
+ cache[name] = False
+ visiting.remove(name)
+ return False
+
+ dependency_value = value_lookup.get(dependency_name)
+ dependent_values = _dependent_values_iter(dependency)
+ if dependent_values is None:
+ if dependency_value is None:
+ cache[name] = False
+ visiting.remove(name)
+ return False
+ else:
+ if dependency_value not in dependent_values:
+ cache[name] = False
+ visiting.remove(name)
+ return False
+
+ visiting.remove(name)
+ cache[name] = True
+ return True
+
+
class PropertyContext:
__trivial_attribute_reference__ = re.compile(r"\$\{([^${}\[\],:;/*\'
\t\r\n\\d][^${}\[\],:;/*\' \t\r\n]*)}")
__escaped_attribute_reference__ = re.compile(r"\$\{'([^${}\[\],:;/*\'
\t\r\n\\d][^${}\[\],:;/*\'\t\r\n]*)'}")
@@ -328,15 +433,24 @@ class ProcessContext(PropertyContext):
def __init__(self, java_context):
self.java_context = java_context
+ descriptors, descriptor_lookup, value_lookup =
_collect_descriptor_metadata(java_context)
+ dependency_cache = {}
- descriptors = java_context.getProperties().keySet()
- self.name = java_context.getName()
+ get_name = getattr(java_context, 'getName', None)
+ self.name = get_name() if callable(get_name) else None
self.property_values = {}
self.descriptor_value_map = {}
for descriptor in descriptors:
+ if not _dependencies_satisfied(descriptor, descriptor_lookup,
value_lookup, dependency_cache, set()):
+ continue
+
property_value = java_context.getProperty(descriptor.getName())
string_value = property_value.getValue()
+ if string_value is None:
+ string_value = value_lookup.get(descriptor.getName())
+ else:
+ value_lookup[descriptor.getName()] = string_value
property_value =
self.create_python_property_value(descriptor.isExpressionLanguageSupported(),
property_value, string_value)
self.property_values[descriptor.getName()] = property_value
@@ -358,14 +472,21 @@ class ValidationContext(PropertyContext):
def __init__(self, java_context):
self.java_context = java_context
-
- descriptors = java_context.getProperties().keySet()
+ descriptors, descriptor_lookup, value_lookup =
_collect_descriptor_metadata(java_context)
+ dependency_cache = {}
self.property_values = {}
self.descriptor_value_map = {}
for descriptor in descriptors:
+ if not _dependencies_satisfied(descriptor, descriptor_lookup,
value_lookup, dependency_cache, set()):
+ continue
+
property_value = java_context.getProperty(descriptor)
string_value = property_value.getValue()
+ if string_value is None:
+ string_value = value_lookup.get(descriptor.getName())
+ else:
+ value_lookup[descriptor.getName()] = string_value
property_value =
self.create_python_property_value(descriptor.isExpressionLanguageSupported(),
property_value, string_value)
self.property_values[descriptor.getName()] = property_value