This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 97ffddd1fc1cc8dc8f979e55fcd81ce3262972d4 Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Oct 27 15:04:14 2025 +0100 MINIFICPP-2654 Evaluate EL expressions only when it is explicitly called in NiFi python processors Signed-off-by: Ferenc Gerlits <[email protected]> Closes #2050 --- docker/test/integration/cluster/ImageStore.py | 4 +- docker/test/integration/features/python.feature | 25 +++++++ .../EvaluateExpressionLanguageChecker.py | 26 +++++++ .../python/EvaluateExpressionLanguageChecker.py | 83 ++++++++++++++++++++++ .../python/pythonprocessors/nifiapi/properties.py | 33 +++++---- extensions/python/types/PyProcessContext.cpp | 40 +++++++++++ extensions/python/types/PyProcessContext.h | 2 + 7 files changed, 199 insertions(+), 14 deletions(-) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 9fe772dcb..7d6c203ea 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -214,6 +214,7 @@ class ImageStore: COPY NifiStyleLogDynamicProperties.py {minifi_python_dir}/nifi_python_processors/NifiStyleLogDynamicProperties.py COPY LogDynamicProperties.py {minifi_python_dir}/LogDynamicProperties.py COPY ExpressionLanguagePropertyWithValidator.py {minifi_python_dir}/nifi_python_processors/ExpressionLanguagePropertyWithValidator.py + COPY EvaluateExpressionLanguageChecker.py {minifi_python_dir}/nifi_python_processors/EvaluateExpressionLanguageChecker.py RUN python3 -m venv {minifi_python_venv_parent}/venv """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, pip3_install_command=pip3_install_command, @@ -239,7 +240,8 @@ class ImageStore: build_full_python_resource_path("TestStateManager.py"), build_full_python_resource_path("NifiStyleLogDynamicProperties.py"), build_full_python_resource_path("LogDynamicProperties.py"), - build_full_python_resource_path("ExpressionLanguagePropertyWithValidator.py") + build_full_python_resource_path("ExpressionLanguagePropertyWithValidator.py"), + build_full_python_resource_path("EvaluateExpressionLanguageChecker.py") ]) def __build_minifi_cpp_image_with_llamacpp_model(self): diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 1f66d7b3e..0dad156d8 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -279,3 +279,28 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then the Minifi logs contain the following message: "ProcessSession rollback for ExpressionLanguagePropertyWithValidator" in less than 60 seconds + + Scenario: MiNiFi C++ can use evaluate expression language expressions correctly using the NiFi python API + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a UpdateAttribute processor with the "my.attribute" property set to "my.value" + And a EvaluateExpressionLanguageChecker processor + And the "EL Property" property of the EvaluateExpressionLanguageChecker processor is set to "${my.attribute:toUpper()}" + And the "Non EL Property" property of the EvaluateExpressionLanguageChecker processor is set to "non el ${my.attribute}" + And the "My Dynamic Property" property of the EvaluateExpressionLanguageChecker processor is set to "Dynamic ${my.attribute}" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And python processors without dependencies are present on the MiNiFi agent + + And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute + And the "success" relationship of the UpdateAttribute processor is connected to the EvaluateExpressionLanguageChecker + And the "success" relationship of the EvaluateExpressionLanguageChecker processor is connected to the PutFile + + When all instances start up + + Then one flowfile with the contents "Check successful!" is placed in the monitored directory in less than 30 seconds + And the Minifi logs contain the following message: "EL Property value: ${my.attribute:toUpper()}" in less than 1 seconds + And the Minifi logs contain the following message: "Evaluated EL Property value: MY.VALUE" in less than 1 seconds + And the Minifi logs contain the following message: "Non EL Property value: non el ${my.attribute}" in less than 1 seconds + And the Minifi logs contain the following message: "Evaluated Non EL Property value: non el ${my.attribute}" in less than 1 seconds + And the Minifi logs contain the following message: "Non-existent property value is empty" in less than 1 seconds + And the Minifi logs contain the following message: "My Dynamic Property value is: Dynamic ${my.attribute}" in less than 1 seconds + And the Minifi logs contain the following message: "My Dynamic Property evaluated value is: Dynamic my.value" in less than 1 seconds diff --git a/docker/test/integration/minifi/processors/EvaluateExpressionLanguageChecker.py b/docker/test/integration/minifi/processors/EvaluateExpressionLanguageChecker.py new file mode 100644 index 000000000..11f95053c --- /dev/null +++ b/docker/test/integration/minifi/processors/EvaluateExpressionLanguageChecker.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class EvaluateExpressionLanguageChecker(Processor): + def __init__(self, context): + super(EvaluateExpressionLanguageChecker, self).__init__( + context=context, + clazz='EvaluateExpressionLanguageChecker', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/EvaluateExpressionLanguageChecker.py b/docker/test/integration/resources/python/EvaluateExpressionLanguageChecker.py new file mode 100644 index 000000000..269e7ca47 --- /dev/null +++ b/docker/test/integration/resources/python/EvaluateExpressionLanguageChecker.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import PropertyDescriptor, ExpressionLanguageScope + + +class EvaluateExpressionLanguageChecker(FlowFileTransform): + class Java: + implements = ["org.apache.nifi.python.processor.FlowFileTransform"] + + class ProcessorDetails: + version = "0.1.0" + description = "Processor to check property evaluates with expression language supported and not supported" + + EL_PROPERTY = PropertyDescriptor( + name="EL Property", + description="EL property", + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES + ) + + NON_EL_PROPERTY = PropertyDescriptor( + name="Non EL Property", + description="Non EL property", + ) + + property_descriptors = [ + EL_PROPERTY, + NON_EL_PROPERTY + ] + + def __init__(self, **kwargs): + pass + + def getPropertyDescriptors(self): + return self.property_descriptors + + def getDynamicPropertyDescriptor(self, propertyname): + return PropertyDescriptor(name=propertyname, + description="A user-defined property", + dynamic=True) + + def transform(self, context, flowFile): + el_property = context.getProperty(self.EL_PROPERTY) + el_property_value = el_property.getValue() + self.logger.info("EL Property value: " + str(el_property_value)) + + el_property_value_evaluated = el_property.evaluateAttributeExpressions(flowFile).getValue() + self.logger.info("Evaluated EL Property value: " + str(el_property_value_evaluated)) + + non_el_property = context.getProperty(self.NON_EL_PROPERTY) + non_el_property_value = non_el_property.getValue() + self.logger.info("Non EL Property value: " + str(non_el_property_value)) + + non_el_property_value_evaluated = non_el_property.evaluateAttributeExpressions(flowFile).getValue() + self.logger.info("Evaluated Non EL Property value: " + str(non_el_property_value_evaluated)) + + non_existent_value = context.getProperty("non-existent-property").evaluateAttributeExpressions(flowFile).getValue() + if non_existent_value is None: + self.logger.info("Non-existent property value is empty") + + dynamic_property = context.getProperty("My Dynamic Property") + dynamic_property_value = dynamic_property.getValue() + if dynamic_property_value: + self.logger.info("My Dynamic Property value is: " + str(dynamic_property_value)) + + dynamic_property_evaluated_value = dynamic_property.evaluateAttributeExpressions(flowFile).getValue() + if dynamic_property_evaluated_value: + self.logger.info("My Dynamic Property evaluated value is: " + str(dynamic_property_evaluated_value)) + + return FlowFileTransformResult("success", contents="Check successful!") diff --git a/extensions/python/pythonprocessors/nifiapi/properties.py b/extensions/python/pythonprocessors/nifiapi/properties.py index a0097f446..aafb0b060 100644 --- a/extensions/python/pythonprocessors/nifiapi/properties.py +++ b/extensions/python/pythonprocessors/nifiapi/properties.py @@ -195,14 +195,13 @@ class FlowFile: class PythonPropertyValue: - def __init__(self, cpp_context: CppProcessContext, name: str, string_value: str, el_supported: bool, controller_service_definition: str): + def __init__(self, cpp_context: CppProcessContext, name: str, string_value: str, el_supported: bool, controller_service_definition: str, is_dynamic: bool = False): self.cpp_context = cpp_context - self.value = None + self.value = string_value self.name = name - if string_value is not None: - self.value = string_value self.el_supported = el_supported self.controller_service_definition = controller_service_definition + self.is_dynamic = is_dynamic def getValue(self) -> str: return self.value @@ -262,12 +261,17 @@ class PythonPropertyValue: return 0 def evaluateAttributeExpressions(self, flow_file: FlowFile = None): - if flow_file is None or not self.el_supported: - return self - # If Expression Language is supported and present, evaluate it and return a new PropertyValue. + # If Expression Language is supported, evaluate it and return a new PropertyValue. # Otherwise just return self, in order to avoid the cost of making the call to cpp for getProperty - new_string_value = self.cpp_context.getProperty(self.name, flow_file.cpp_flow_file) - return PythonPropertyValue(self.cpp_context, self.name, new_string_value, self.el_supported, self.controller_service_definition) + if not self.el_supported or not self.value: + return self + + new_string_value = None + if self.is_dynamic: + new_string_value = self.cpp_context.getDynamicProperty(self.name, flow_file.cpp_flow_file) + else: + new_string_value = self.cpp_context.getProperty(self.name, flow_file.cpp_flow_file) + return PythonPropertyValue(self.cpp_context, self.name, new_string_value, self.el_supported, self.controller_service_definition, self.is_dynamic) def asControllerService(self): if not self.controller_service_definition: @@ -291,10 +295,13 @@ class ProcessContext: property_name = descriptor.name expression_language_support = descriptor.expressionLanguageScope != ExpressionLanguageScope.NONE controller_service_definition = descriptor.controllerServiceDefinition - property_value = self.cpp_context.getProperty(property_name) - if not property_value and self.processor.supports_dynamic_properties: - property_value = self.cpp_context.getDynamicProperty(property_name) - return PythonPropertyValue(self.cpp_context, property_name, property_value, expression_language_support, controller_service_definition) + is_dynamic = False + property_value = self.cpp_context.getRawProperty(property_name) + if property_value is None and self.processor.supports_dynamic_properties: + property_value = self.cpp_context.getRawDynamicProperty(property_name) + if property_value is not None: + is_dynamic = True + return PythonPropertyValue(self.cpp_context, property_name, property_value, expression_language_support, controller_service_definition, is_dynamic) def getStateManager(self) -> StateManager: return StateManager(self.cpp_context.getStateManager()) diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 3724e5bc9..f207faff6 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -30,7 +30,9 @@ namespace org::apache::nifi::minifi::extensions::python { static PyMethodDef PyProcessContext_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {"getProperty", (PyCFunction) PyProcessContext::getProperty, METH_VARARGS, nullptr}, + {"getRawProperty", (PyCFunction) PyProcessContext::getRawProperty, METH_VARARGS, nullptr}, {"getDynamicProperty", (PyCFunction) PyProcessContext::getDynamicProperty, METH_VARARGS, nullptr}, + {"getRawDynamicProperty", (PyCFunction) PyProcessContext::getRawDynamicProperty, METH_VARARGS, nullptr}, {"getDynamicPropertyKeys", (PyCFunction) PyProcessContext::getDynamicPropertyKeys, METH_VARARGS, nullptr}, {"getStateManager", (PyCFunction) PyProcessContext::getStateManager, METH_VARARGS, nullptr}, {"getControllerService", (PyCFunction) PyProcessContext::getControllerService, METH_VARARGS, nullptr}, @@ -102,6 +104,25 @@ PyObject* PyProcessContext::getProperty(PyProcessContext* self, PyObject* args) Py_RETURN_NONE; } +PyObject* PyProcessContext::getRawProperty(PyProcessContext* self, PyObject* args) { + auto context = self->process_context_; + if (!context) { + PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); + return nullptr; + } + + const char* property_name = nullptr; + if (!PyArg_ParseTuple(args, "s", &property_name)) { + return nullptr; + } + + std::string property_str{property_name}; + if (const auto property_value = context->getRawProperty(property_str)) { + return object::returnReference(*property_value); + } + Py_RETURN_NONE; +} + PyObject* PyProcessContext::getDynamicProperty(PyProcessContext* self, PyObject* args) { auto context = self->process_context_; if (!context) { @@ -134,6 +155,25 @@ PyObject* PyProcessContext::getDynamicProperty(PyProcessContext* self, PyObject* Py_RETURN_NONE; } +PyObject* PyProcessContext::getRawDynamicProperty(PyProcessContext* self, PyObject* args) { + auto context = self->process_context_; + if (!context) { + PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); + return nullptr; + } + + const char* property_name = nullptr; + if (!PyArg_ParseTuple(args, "s", &property_name)) { + return nullptr; + } + + std::string property_str{property_name}; + if (const auto property_value = context->getRawDynamicProperty(property_str)) { + return object::returnReference(*property_value); + } + Py_RETURN_NONE; +} + PyObject* PyProcessContext::getDynamicPropertyKeys(PyProcessContext* self, PyObject*) { auto context = self->process_context_; if (!context) { diff --git a/extensions/python/types/PyProcessContext.h b/extensions/python/types/PyProcessContext.h index 4423751e6..71f306f93 100644 --- a/extensions/python/types/PyProcessContext.h +++ b/extensions/python/types/PyProcessContext.h @@ -37,7 +37,9 @@ struct PyProcessContext { static int init(PyProcessContext* self, PyObject* args, PyObject* kwds); static PyObject* getProperty(PyProcessContext* self, PyObject* args); + static PyObject* getRawProperty(PyProcessContext* self, PyObject* args); static PyObject* getDynamicProperty(PyProcessContext* self, PyObject* args); + static PyObject* getRawDynamicProperty(PyProcessContext* self, PyObject* args); static PyObject* getDynamicPropertyKeys(PyProcessContext* self, PyObject* args); static PyObject* getStateManager(PyProcessContext* self, PyObject* args); static PyObject* getControllerService(PyProcessContext* self, PyObject* args);
