This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e2aa192cc04220c608d2573bab885dde9b522aca
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Fri Aug 16 13:45:22 2024 +0000

    MINIFICPP-2436 Allow relative import paths in (NiFi) python processors
    
    Closes #1854
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 docker/test/integration/cluster/ImageStore.py      |  8 +++++-
 docker/test/integration/features/python.feature    | 14 +++++++++++
 .../minifi/processors/RelativeImporterProcessor.py | 26 +++++++++++++++++++
 .../resources/python/RelativeImporterProcessor.py  | 29 ++++++++++++++++++++++
 .../resources/python/multiplierutils.py            | 17 +++++++++++++
 .../integration/resources/python/subtractutils.py  | 17 +++++++++++++
 extensions/python/ExecutePythonProcessor.cpp       |  1 +
 extensions/python/ExecutePythonProcessor.h         |  5 ++++
 extensions/python/PythonCreator.h                  |  7 ++++--
 extensions/python/PythonObjectFactory.h            | 15 +++++++++--
 extensions/python/PythonScriptEngine.cpp           | 20 +++++++++++++++
 extensions/python/PythonScriptEngine.h             |  1 +
 12 files changed, 155 insertions(+), 5 deletions(-)

diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index e86707557..bd16a33dd 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -165,6 +165,9 @@ class ImageStore:
                 COPY ProcessContextInterfaceChecker.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/ProcessContextInterfaceChecker.py
                 COPY CreateFlowFile.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateFlowFile.py
                 COPY FailureWithAttributes.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithAttributes.py
+                COPY subtractutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py
+                COPY RelativeImporterProcessor.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py
+                COPY multiplierutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py
                 RUN wget {parse_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     wget {chunk_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     echo 'langchain<=0.17.0' > 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt
 && \\
@@ -188,7 +191,10 @@ class ImageStore:
                                                os.path.join(self.test_dir, 
"resources", "python", "SpecialPropertyTypeChecker.py"),
                                                os.path.join(self.test_dir, 
"resources", "python", "ProcessContextInterfaceChecker.py"),
                                                os.path.join(self.test_dir, 
"resources", "python", "CreateFlowFile.py"),
-                                               os.path.join(self.test_dir, 
"resources", "python", "FailureWithAttributes.py")])
+                                               os.path.join(self.test_dir, 
"resources", "python", "FailureWithAttributes.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "RelativeImporterProcessor.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "subtractutils.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "multiplierutils.py")])
 
     def __build_http_proxy_image(self):
         dockerfile = dedent("""\
diff --git a/docker/test/integration/features/python.feature 
b/docker/test/integration/features/python.feature
index aa0a438ab..a59b74bdd 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -174,3 +174,17 @@ Feature: MiNiFi can use python processors in its flows
 
     Then the Minifi logs contain the following message: "key:error.message 
value:Error" in less than 60 seconds
     And the Minifi logs contain the following message: "key:my.attribute 
value:my.value" in less than 10 seconds
+
+  @USE_NIFI_PYTHON_PROCESSORS
+  Scenario: NiFi native python processors support relative imports
+    Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
+    And a RelativeImporterProcessor processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And python is installed on the MiNiFi agent with a pre-created virtualenv
+
+    And the "success" relationship of the GenerateFlowFile processor is 
connected to the RelativeImporterProcessor
+    And the "success" relationship of the RelativeImporterProcessor processor 
is connected to the PutFile
+
+    When all instances start up
+
+    Then one flowfile with the contents "The final result is 1990" is placed 
in the monitored directory in less than 30 seconds
diff --git 
a/docker/test/integration/minifi/processors/RelativeImporterProcessor.py 
b/docker/test/integration/minifi/processors/RelativeImporterProcessor.py
new file mode 100644
index 000000000..b02b3df6a
--- /dev/null
+++ b/docker/test/integration/minifi/processors/RelativeImporterProcessor.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class RelativeImporterProcessor(Processor):
+    def __init__(self, context):
+        super(RelativeImporterProcessor, self).__init__(
+            context=context,
+            clazz='RelativeImporterProcessor',
+            
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.compute.processors.',
+            properties={},
+            schedule={'scheduling strategy': 'EVENT_DRIVEN'},
+            auto_terminate=[])
diff --git 
a/docker/test/integration/resources/python/RelativeImporterProcessor.py 
b/docker/test/integration/resources/python/RelativeImporterProcessor.py
new file mode 100644
index 000000000..1ec2d254f
--- /dev/null
+++ b/docker/test/integration/resources/python/RelativeImporterProcessor.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from .multiplierutils import double
+from ..subtractutils import minus_ten
+
+
+class RelativeImporterProcessor(FlowFileTransform):
+    def __init__(self, **kwargs):
+        pass
+
+    def transform(self, context, flowFile):
+        number = 1000
+        number = double(number)
+        number = minus_ten(number)
+        return FlowFileTransformResult("success", contents="The final result 
is {}".format(number))
diff --git a/docker/test/integration/resources/python/multiplierutils.py 
b/docker/test/integration/resources/python/multiplierutils.py
new file mode 100644
index 000000000..7b0bafa67
--- /dev/null
+++ b/docker/test/integration/resources/python/multiplierutils.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+def double(number):
+    return number * 2
diff --git a/docker/test/integration/resources/python/subtractutils.py 
b/docker/test/integration/resources/python/subtractutils.py
new file mode 100644
index 000000000..854a1d3c0
--- /dev/null
+++ b/docker/test/integration/resources/python/subtractutils.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+def minus_ten(number):
+    return number - 10
diff --git a/extensions/python/ExecutePythonProcessor.cpp 
b/extensions/python/ExecutePythonProcessor.cpp
index 115760e40..a1541498a 100644
--- a/extensions/python/ExecutePythonProcessor.cpp
+++ b/extensions/python/ExecutePythonProcessor.cpp
@@ -61,6 +61,7 @@ void ExecutePythonProcessor::initalizeThroughScriptEngine() {
   try {
     appendPathForImportModules();
     python_script_engine_->appendModulePaths(python_paths_);
+    python_script_engine_->setModuleAttributes(qualified_module_name_);
     python_script_engine_->eval(script_to_exec_);
     if (python_class_name_) {
       python_script_engine_->initializeProcessorObject(*python_class_name_);
diff --git a/extensions/python/ExecutePythonProcessor.h 
b/extensions/python/ExecutePythonProcessor.h
index 59b561327..a244c910c 100644
--- a/extensions/python/ExecutePythonProcessor.h
+++ b/extensions/python/ExecutePythonProcessor.h
@@ -123,6 +123,10 @@ class ExecutePythonProcessor : public core::Processor {
     python_paths_ = python_paths;
   }
 
+  void setQualifiedModuleName(const std::string& qualified_module_name) {
+    qualified_module_name_ = qualified_module_name;
+  }
+
   std::map<std::string, core::Property> getProperties() const override;
 
   std::vector<core::Relationship> getPythonRelationships();
@@ -149,6 +153,7 @@ class ExecutePythonProcessor : public core::Processor {
   std::unique_ptr<PythonScriptEngine> python_script_engine_;
   std::optional<std::string> python_class_name_;
   std::vector<std::filesystem::path> python_paths_;
+  std::string qualified_module_name_;
 
   void appendPathForImportModules();
   void loadScriptFromFile();
diff --git a/extensions/python/PythonCreator.h 
b/extensions/python/PythonCreator.h
index 13f139bb4..98717a2d5 100644
--- a/extensions/python/PythonCreator.h
+++ b/extensions/python/PythonCreator.h
@@ -78,6 +78,9 @@ class PythonCreator : public minifi::core::CoreComponent {
         full_name = 
utils::string::join_pack("org.apache.nifi.minifi.processors.", package, ".", 
script_name.string());
         class_name = full_name;
       }
+
+      const std::string qualified_module_name_prefix = 
"org.apache.nifi.minifi.processors.";
+      std::string qualified_module_name = 
full_name.substr(qualified_module_name_prefix.length());
       if (path.string().find("nifi_python_processors") != std::string::npos) {
         auto utils_path = (std::filesystem::path("nifi_python_processors") / 
"utils").string();
         if (path.string().find(utils_path) != std::string::npos) {
@@ -85,11 +88,11 @@ class PythonCreator : public minifi::core::CoreComponent {
         }
         logger_->log_info("Registering NiFi python processor: {}", class_name);
         core::getClassLoader().registerClass(class_name, 
std::make_unique<PythonObjectFactory>(path.string(), script_name.string(),
-          PythonProcessorType::NIFI_TYPE, 
std::vector<std::filesystem::path>{python_lib_path, 
std::filesystem::path{pathListings.value()}, path.parent_path()}));
+          PythonProcessorType::NIFI_TYPE, 
std::vector<std::filesystem::path>{python_lib_path, 
std::filesystem::path{pathListings.value()}, path.parent_path()}, 
qualified_module_name));
       } else {
         logger_->log_info("Registering MiNiFi python processor: {}", 
class_name);
         core::getClassLoader().registerClass(class_name, 
std::make_unique<PythonObjectFactory>(path.string(), script_name.string(),
-          PythonProcessorType::MINIFI_TYPE, 
std::vector<std::filesystem::path>{python_lib_path, 
std::filesystem::path{pathListings.value()}}));
+          PythonProcessorType::MINIFI_TYPE, 
std::vector<std::filesystem::path>{python_lib_path, 
std::filesystem::path{pathListings.value()}}, qualified_module_name));
       }
       registered_classes_.push_back(class_name);
       try {
diff --git a/extensions/python/PythonObjectFactory.h 
b/extensions/python/PythonObjectFactory.h
index 3a0c0cd76..5c22b895a 100644
--- a/extensions/python/PythonObjectFactory.h
+++ b/extensions/python/PythonObjectFactory.h
@@ -35,11 +35,13 @@ enum class PythonProcessorType {
 
 class PythonObjectFactory : public 
org::apache::nifi::minifi::core::DefaultObjectFactory<org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor>
 {
  public:
-  explicit PythonObjectFactory(std::string file, std::string class_name, 
PythonProcessorType python_processor_type, const 
std::vector<std::filesystem::path>& python_paths)
+  explicit PythonObjectFactory(std::string file, std::string class_name, 
PythonProcessorType python_processor_type,
+    const std::vector<std::filesystem::path>& python_paths, std::string 
qualified_module_name)
       : file_(std::move(file)),
         class_name_(std::move(class_name)),
         python_paths_(python_paths),
-        python_processor_type_(python_processor_type) {
+        python_processor_type_(python_processor_type),
+        qualified_module_name_(std::move(qualified_module_name)) {
   }
 
   std::unique_ptr<org::apache::nifi::minifi::core::CoreComponent> create(const 
std::string &name) override {
@@ -52,6 +54,7 @@ class PythonObjectFactory : public 
org::apache::nifi::minifi::core::DefaultObjec
       ptr->setPythonClassName(class_name_);
       ptr->setPythonPaths(python_paths_);
     }
+    ptr->setQualifiedModuleName(qualified_module_name_);
     ptr->initialize();
     
ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile,
 file_);
     return ptr;
@@ -67,6 +70,7 @@ class PythonObjectFactory : public 
org::apache::nifi::minifi::core::DefaultObjec
       ptr->setPythonClassName(class_name_);
       ptr->setPythonPaths(python_paths_);
     }
+    ptr->setQualifiedModuleName(qualified_module_name_);
     ptr->initialize();
     
ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile,
 file_);
     return ptr;
@@ -74,6 +78,11 @@ class PythonObjectFactory : public 
org::apache::nifi::minifi::core::DefaultObjec
 
   org::apache::nifi::minifi::core::CoreComponent* createRaw(const std::string 
&name) override {
     auto ptr = 
dynamic_cast<org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor*>(DefaultObjectFactory::createRaw(name));
+    if (python_processor_type_ == PythonProcessorType::NIFI_TYPE) {
+      ptr->setPythonClassName(class_name_);
+      ptr->setPythonPaths(python_paths_);
+    }
+    ptr->setQualifiedModuleName(qualified_module_name_);
     ptr->initialize();
     
ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile,
 file_);
     return dynamic_cast<org::apache::nifi::minifi::core::CoreComponent*>(ptr);
@@ -85,6 +94,7 @@ class PythonObjectFactory : public 
org::apache::nifi::minifi::core::DefaultObjec
       ptr->setPythonClassName(class_name_);
       ptr->setPythonPaths(python_paths_);
     }
+    ptr->setQualifiedModuleName(qualified_module_name_);
     ptr->initialize();
     
ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile,
 file_);
     return dynamic_cast<org::apache::nifi::minifi::core::CoreComponent*>(ptr);
@@ -95,4 +105,5 @@ class PythonObjectFactory : public 
org::apache::nifi::minifi::core::DefaultObjec
   std::string class_name_;
   std::vector<std::filesystem::path> python_paths_;
   PythonProcessorType python_processor_type_;
+  std::string qualified_module_name_;
 };
diff --git a/extensions/python/PythonScriptEngine.cpp 
b/extensions/python/PythonScriptEngine.cpp
index 10893c195..60e661b05 100644
--- a/extensions/python/PythonScriptEngine.cpp
+++ b/extensions/python/PythonScriptEngine.cpp
@@ -243,4 +243,24 @@ std::vector<core::Relationship> 
PythonScriptEngine::getCustomPythonRelationships
   return relationships;
 }
 
+void PythonScriptEngine::setModuleAttributes(const std::string& 
qualified_module_name) {
+  GlobalInterpreterLock gil;
+  size_t last_dot = qualified_module_name.find_last_of('.');
+
+  std::string qualified_package_name;
+  if (last_dot != std::string::npos) {
+    qualified_package_name = qualified_module_name.substr(0, last_dot);
+  }
+
+  if (!qualified_package_name.empty()) {
+    auto package_name_object = 
OwnedObject(PyUnicode_FromStringAndSize(qualified_package_name.data(), 
gsl::narrow<Py_ssize_t>(qualified_package_name.length())));
+    PyDict_SetItemString(bindings_.get(), "__package__", 
package_name_object.get());
+  }
+
+  if (!qualified_module_name.empty()) {
+    auto module_name_object = 
OwnedObject(PyUnicode_FromStringAndSize(qualified_module_name.data(), 
gsl::narrow<Py_ssize_t>(qualified_module_name.length())));
+    PyDict_SetItemString(bindings_.get(), "__name__", 
module_name_object.get());
+  }
+}
+
 }  // namespace org::apache::nifi::minifi::extensions::python
diff --git a/extensions/python/PythonScriptEngine.h 
b/extensions/python/PythonScriptEngine.h
index dd0248516..5823d6f1e 100644
--- a/extensions/python/PythonScriptEngine.h
+++ b/extensions/python/PythonScriptEngine.h
@@ -154,6 +154,7 @@ class PythonScriptEngine {
   void initialize(const core::Relationship& success, const core::Relationship& 
failure, const core::Relationship& original, const 
std::shared_ptr<core::logging::Logger>& logger);
   void initializeProcessorObject(const std::string& python_class_name);
   std::vector<core::Relationship> getCustomPythonRelationships();
+  void setModuleAttributes(const std::string& qualified_module_name);
 
  private:
   void evalInternal(std::string_view script);

Reply via email to