This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e2aa192cc04220c608d2573bab885dde9b522aca Author: Gabor Gyimesi <[email protected]> AuthorDate: Fri Aug 16 13:45:22 2024 +0000 MINIFICPP-2436 Allow relative import paths in (NiFi) python processors Closes #1854 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/cluster/ImageStore.py | 8 +++++- docker/test/integration/features/python.feature | 14 +++++++++++ .../minifi/processors/RelativeImporterProcessor.py | 26 +++++++++++++++++++ .../resources/python/RelativeImporterProcessor.py | 29 ++++++++++++++++++++++ .../resources/python/multiplierutils.py | 17 +++++++++++++ .../integration/resources/python/subtractutils.py | 17 +++++++++++++ extensions/python/ExecutePythonProcessor.cpp | 1 + extensions/python/ExecutePythonProcessor.h | 5 ++++ extensions/python/PythonCreator.h | 7 ++++-- extensions/python/PythonObjectFactory.h | 15 +++++++++-- extensions/python/PythonScriptEngine.cpp | 20 +++++++++++++++ extensions/python/PythonScriptEngine.h | 1 + 12 files changed, 155 insertions(+), 5 deletions(-) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index e86707557..bd16a33dd 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -165,6 +165,9 @@ class ImageStore: COPY ProcessContextInterfaceChecker.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py COPY CreateFlowFile.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py COPY FailureWithAttributes.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py + COPY subtractutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py + COPY RelativeImporterProcessor.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py + COPY multiplierutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\ @@ -188,7 +191,10 @@ class ImageStore: os.path.join(self.test_dir, "resources", "python", "SpecialPropertyTypeChecker.py"), os.path.join(self.test_dir, "resources", "python", "ProcessContextInterfaceChecker.py"), os.path.join(self.test_dir, "resources", "python", "CreateFlowFile.py"), - os.path.join(self.test_dir, "resources", "python", "FailureWithAttributes.py")]) + os.path.join(self.test_dir, "resources", "python", "FailureWithAttributes.py"), + os.path.join(self.test_dir, "resources", "python", "RelativeImporterProcessor.py"), + os.path.join(self.test_dir, "resources", "python", "subtractutils.py"), + os.path.join(self.test_dir, "resources", "python", "multiplierutils.py")]) def __build_http_proxy_image(self): dockerfile = dedent("""\ diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index aa0a438ab..a59b74bdd 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -174,3 +174,17 @@ Feature: MiNiFi can use python processors in its flows Then the Minifi logs contain the following message: "key:error.message value:Error" in less than 60 seconds And the Minifi logs contain the following message: "key:my.attribute value:my.value" in less than 10 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: NiFi native python processors support relative imports + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a RelativeImporterProcessor processor + And a PutFile processor with the "Directory" property set to "/tmp/output" + And python is installed on the MiNiFi agent with a pre-created virtualenv + + And the "success" relationship of the GenerateFlowFile processor is connected to the RelativeImporterProcessor + And the "success" relationship of the RelativeImporterProcessor processor is connected to the PutFile + + When all instances start up + + Then one flowfile with the contents "The final result is 1990" is placed in the monitored directory in less than 30 seconds diff --git a/docker/test/integration/minifi/processors/RelativeImporterProcessor.py b/docker/test/integration/minifi/processors/RelativeImporterProcessor.py new file mode 100644 index 000000000..b02b3df6a --- /dev/null +++ b/docker/test/integration/minifi/processors/RelativeImporterProcessor.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class RelativeImporterProcessor(Processor): + def __init__(self, context): + super(RelativeImporterProcessor, self).__init__( + context=context, + clazz='RelativeImporterProcessor', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.compute.processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/RelativeImporterProcessor.py b/docker/test/integration/resources/python/RelativeImporterProcessor.py new file mode 100644 index 000000000..1ec2d254f --- /dev/null +++ b/docker/test/integration/resources/python/RelativeImporterProcessor.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from .multiplierutils import double +from ..subtractutils import minus_ten + + +class RelativeImporterProcessor(FlowFileTransform): + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + number = 1000 + number = double(number) + number = minus_ten(number) + return FlowFileTransformResult("success", contents="The final result is {}".format(number)) diff --git a/docker/test/integration/resources/python/multiplierutils.py b/docker/test/integration/resources/python/multiplierutils.py new file mode 100644 index 000000000..7b0bafa67 --- /dev/null +++ b/docker/test/integration/resources/python/multiplierutils.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def double(number): + return number * 2 diff --git a/docker/test/integration/resources/python/subtractutils.py b/docker/test/integration/resources/python/subtractutils.py new file mode 100644 index 000000000..854a1d3c0 --- /dev/null +++ b/docker/test/integration/resources/python/subtractutils.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def minus_ten(number): + return number - 10 diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index 115760e40..a1541498a 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -61,6 +61,7 @@ void ExecutePythonProcessor::initalizeThroughScriptEngine() { try { appendPathForImportModules(); python_script_engine_->appendModulePaths(python_paths_); + python_script_engine_->setModuleAttributes(qualified_module_name_); python_script_engine_->eval(script_to_exec_); if (python_class_name_) { python_script_engine_->initializeProcessorObject(*python_class_name_); diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index 59b561327..a244c910c 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -123,6 +123,10 @@ class ExecutePythonProcessor : public core::Processor { python_paths_ = python_paths; } + void setQualifiedModuleName(const std::string& qualified_module_name) { + qualified_module_name_ = qualified_module_name; + } + std::map<std::string, core::Property> getProperties() const override; std::vector<core::Relationship> getPythonRelationships(); @@ -149,6 +153,7 @@ class ExecutePythonProcessor : public core::Processor { std::unique_ptr<PythonScriptEngine> python_script_engine_; std::optional<std::string> python_class_name_; std::vector<std::filesystem::path> python_paths_; + std::string qualified_module_name_; void appendPathForImportModules(); void loadScriptFromFile(); diff --git a/extensions/python/PythonCreator.h b/extensions/python/PythonCreator.h index 13f139bb4..98717a2d5 100644 --- a/extensions/python/PythonCreator.h +++ b/extensions/python/PythonCreator.h @@ -78,6 +78,9 @@ class PythonCreator : public minifi::core::CoreComponent { full_name = utils::string::join_pack("org.apache.nifi.minifi.processors.", package, ".", script_name.string()); class_name = full_name; } + + const std::string qualified_module_name_prefix = "org.apache.nifi.minifi.processors."; + std::string qualified_module_name = full_name.substr(qualified_module_name_prefix.length()); if (path.string().find("nifi_python_processors") != std::string::npos) { auto utils_path = (std::filesystem::path("nifi_python_processors") / "utils").string(); if (path.string().find(utils_path) != std::string::npos) { @@ -85,11 +88,11 @@ class PythonCreator : public minifi::core::CoreComponent { } logger_->log_info("Registering NiFi python processor: {}", class_name); core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), script_name.string(), - PythonProcessorType::NIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}, path.parent_path()})); + PythonProcessorType::NIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}, path.parent_path()}, qualified_module_name)); } else { logger_->log_info("Registering MiNiFi python processor: {}", class_name); core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), script_name.string(), - PythonProcessorType::MINIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}})); + PythonProcessorType::MINIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}}, qualified_module_name)); } registered_classes_.push_back(class_name); try { diff --git a/extensions/python/PythonObjectFactory.h b/extensions/python/PythonObjectFactory.h index 3a0c0cd76..5c22b895a 100644 --- a/extensions/python/PythonObjectFactory.h +++ b/extensions/python/PythonObjectFactory.h @@ -35,11 +35,13 @@ enum class PythonProcessorType { class PythonObjectFactory : public org::apache::nifi::minifi::core::DefaultObjectFactory<org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor> { public: - explicit PythonObjectFactory(std::string file, std::string class_name, PythonProcessorType python_processor_type, const std::vector<std::filesystem::path>& python_paths) + explicit PythonObjectFactory(std::string file, std::string class_name, PythonProcessorType python_processor_type, + const std::vector<std::filesystem::path>& python_paths, std::string qualified_module_name) : file_(std::move(file)), class_name_(std::move(class_name)), python_paths_(python_paths), - python_processor_type_(python_processor_type) { + python_processor_type_(python_processor_type), + qualified_module_name_(std::move(qualified_module_name)) { } std::unique_ptr<org::apache::nifi::minifi::core::CoreComponent> create(const std::string &name) override { @@ -52,6 +54,7 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefaultObjec ptr->setPythonClassName(class_name_); ptr->setPythonPaths(python_paths_); } + ptr->setQualifiedModuleName(qualified_module_name_); ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return ptr; @@ -67,6 +70,7 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefaultObjec ptr->setPythonClassName(class_name_); ptr->setPythonPaths(python_paths_); } + ptr->setQualifiedModuleName(qualified_module_name_); ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return ptr; @@ -74,6 +78,11 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefaultObjec org::apache::nifi::minifi::core::CoreComponent* createRaw(const std::string &name) override { auto ptr = dynamic_cast<org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor*>(DefaultObjectFactory::createRaw(name)); + if (python_processor_type_ == PythonProcessorType::NIFI_TYPE) { + ptr->setPythonClassName(class_name_); + ptr->setPythonPaths(python_paths_); + } + ptr->setQualifiedModuleName(qualified_module_name_); ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return dynamic_cast<org::apache::nifi::minifi::core::CoreComponent*>(ptr); @@ -85,6 +94,7 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefaultObjec ptr->setPythonClassName(class_name_); ptr->setPythonPaths(python_paths_); } + ptr->setQualifiedModuleName(qualified_module_name_); ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return dynamic_cast<org::apache::nifi::minifi::core::CoreComponent*>(ptr); @@ -95,4 +105,5 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefaultObjec std::string class_name_; std::vector<std::filesystem::path> python_paths_; PythonProcessorType python_processor_type_; + std::string qualified_module_name_; }; diff --git a/extensions/python/PythonScriptEngine.cpp b/extensions/python/PythonScriptEngine.cpp index 10893c195..60e661b05 100644 --- a/extensions/python/PythonScriptEngine.cpp +++ b/extensions/python/PythonScriptEngine.cpp @@ -243,4 +243,24 @@ std::vector<core::Relationship> PythonScriptEngine::getCustomPythonRelationships return relationships; } +void PythonScriptEngine::setModuleAttributes(const std::string& qualified_module_name) { + GlobalInterpreterLock gil; + size_t last_dot = qualified_module_name.find_last_of('.'); + + std::string qualified_package_name; + if (last_dot != std::string::npos) { + qualified_package_name = qualified_module_name.substr(0, last_dot); + } + + if (!qualified_package_name.empty()) { + auto package_name_object = OwnedObject(PyUnicode_FromStringAndSize(qualified_package_name.data(), gsl::narrow<Py_ssize_t>(qualified_package_name.length()))); + PyDict_SetItemString(bindings_.get(), "__package__", package_name_object.get()); + } + + if (!qualified_module_name.empty()) { + auto module_name_object = OwnedObject(PyUnicode_FromStringAndSize(qualified_module_name.data(), gsl::narrow<Py_ssize_t>(qualified_module_name.length()))); + PyDict_SetItemString(bindings_.get(), "__name__", module_name_object.get()); + } +} + } // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonScriptEngine.h b/extensions/python/PythonScriptEngine.h index dd0248516..5823d6f1e 100644 --- a/extensions/python/PythonScriptEngine.h +++ b/extensions/python/PythonScriptEngine.h @@ -154,6 +154,7 @@ class PythonScriptEngine { void initialize(const core::Relationship& success, const core::Relationship& failure, const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger); void initializeProcessorObject(const std::string& python_class_name); std::vector<core::Relationship> getCustomPythonRelationships(); + void setModuleAttributes(const std::string& qualified_module_name); private: void evalInternal(std::string_view script);
