This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a0ab21f03c813af758e6ed4c47c88caead8d217b Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Oct 15 14:05:21 2025 +0200 MINIFICPP-2640 Remove explicit use of ExecutePythonProcessor Closes #2039 Signed-off-by: Martin Zink <[email protected]> --- PROCESSORS.md | 29 ---- README.md | 4 +- docker/test/integration/features/python.feature | 10 -- .../minifi/processors/ExecutePythonProcessor.py | 25 --- .../resources/python/add_attribute_to_flowfile.py | 29 ---- extensions/python/ExecutePythonProcessor.cpp | 164 +++++++------------ extensions/python/ExecutePythonProcessor.h | 49 +----- extensions/python/PYTHON.md | 130 +++++++++------ extensions/python/PythonObjectFactory.h | 1 - .../python/tests/ExecutePythonProcessorTests.cpp | 179 ++++----------------- libminifi/test/libtest/unit/TestBase.h | 13 +- 11 files changed, 189 insertions(+), 444 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index cd4b7141a..2ce4022ec 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -35,7 +35,6 @@ limitations under the License. - [DeleteS3Object](#DeleteS3Object) - [EvaluateJsonPath](#EvaluateJsonPath) - [ExecuteProcess](#ExecuteProcess) -- [ExecutePythonProcessor](#ExecutePythonProcessor) - [ExecuteScript](#ExecuteScript) - [ExecuteSQL](#ExecuteSQL) - [ExtractText](#ExtractText) @@ -719,34 +718,6 @@ In the list below, the names of required properties appear in bold. Any other pr | success | All created FlowFiles are routed to this relationship. | -## ExecutePythonProcessor - -### Description - -DEPRECATED. This processor should only be used internally for running NiFi and MiNiFi C++ style python processors. Do not use this processor in your own flows, move your python processors to the minifi-python directory instead, where they will be parsed, and then they can be used with their filename as the processor class in the flow configuration. - -This processor executes a script given the flow file and a process session. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back. Scripts must define an onTrigger function which accepts NiFi Context and ProcessSession objects. Scripts are executed once when the processor is run, then the onTrigger method is called for [...] - -### Properties - -In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. - -| Name | Default Value | Allowable Values | Description | -|-----------------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Script File | | | Path to script file to execute. Only one of Script File or Script Body may be used | -| Script Body | | | Script to execute. Only one of Script File or Script Body may be used | -| Module Directory | | | Comma-separated list of paths to files and/or directories which contain modules required by the script | -| **Reload on Script Change** | true | true<br/>false | If true and Script File property is used, then script file will be reloaded if it has changed, otherwise the first loaded version will be used at all times. | - -### Relationships - -| Name | Description | -|----------|--------------------| -| success | Script succeeds | -| failure | Script fails | -| original | Original flow file | - - ## ExecuteScript ### Description diff --git a/README.md b/README.md index 5faa8b8aa..8e1603b63 100644 --- a/README.md +++ b/README.md @@ -83,14 +83,14 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte | Grafana Loki | [PushGrafanaLokiREST](PROCESSORS.md#pushgrafanalokirest)<br>[PushGrafanaLokiGrpc](PROCESSORS.md#pushgrafanalokigrpc) [...] | Kafka | [PublishKafka](PROCESSORS.md#publishkafka)<br>[ConsumeKafka](PROCESSORS.md#consumekafka) [...] | Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) [...] -| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) [...] +| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) [...] | Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) [...] | MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt) [...] | OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) [...] | OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) [...] | PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) [...] | ProcFs (Linux) | [ProcFsMonitor](PROCESSORS.md#procfsmonitor) [...] -| Python Scripting | [ExecuteScript](PROCESSORS.md#executescript)<br>[ExecutePythonProcessor](PROCESSORS.md#executepythonprocessor)<br/>**Custom Python Processors** [...] +| Python Scripting | [ExecuteScript](PROCESSORS.md#executescript)<br/>[**Custom Python Processors**](extensions/python/PYTHON.md) [...] | SMB (Windows) | [FetchSmb](PROCESSORS.md#fetchsmb)<br/>[ListSmb](PROCESSORS.md#listsmb)<br/>[PutSmb](PROCESSORS.md#putsmb) [...] | SFTP | [FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp) [...] | SQL | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/> [...] diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 08b8dd1c4..1f66d7b3e 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -18,16 +18,6 @@ Feature: MiNiFi can use python processors in its flows Background: Given the content of "/tmp/output" is monitored - Scenario: A MiNiFi instance can update attributes through custom python processor - Given a GenerateFlowFile processor with the "File Size" property set to "0B" - And a ExecutePythonProcessor processor with the "Script File" property set to "/tmp/resources/python/add_attribute_to_flowfile.py" - And a LogAttribute processor - And the "success" relationship of the GenerateFlowFile processor is connected to the ExecutePythonProcessor - And the "success" relationship of the ExecutePythonProcessor processor is connected to the LogAttribute - - When all instances start up - Then the Minifi logs contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds - Scenario: A MiNiFi instance can update attributes through native python processor Given the example MiNiFi python processors are present And a GenerateFlowFile processor with the "File Size" property set to "0B" diff --git a/docker/test/integration/minifi/processors/ExecutePythonProcessor.py b/docker/test/integration/minifi/processors/ExecutePythonProcessor.py deleted file mode 100644 index 22523ae7b..000000000 --- a/docker/test/integration/minifi/processors/ExecutePythonProcessor.py +++ /dev/null @@ -1,25 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class ExecutePythonProcessor(Processor): - def __init__(self, context): - super(ExecutePythonProcessor, self).__init__( - context=context, - clazz='ExecutePythonProcessor', - auto_terminate=['success']) diff --git a/docker/test/integration/resources/python/add_attribute_to_flowfile.py b/docker/test/integration/resources/python/add_attribute_to_flowfile.py deleted file mode 100644 index c7944de14..000000000 --- a/docker/test/integration/resources/python/add_attribute_to_flowfile.py +++ /dev/null @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -def describe(processor): - processor.setDescription("Adds an attribute to your flow files") - - -def onInitialize(processor): - processor.setSupportsDynamicProperties() - - -def onTrigger(context, session): - flow_file = session.get() - if flow_file is not None: - flow_file.addAttribute("Python attribute", "attributevalue") - session.transfer(flow_file, REL_SUCCESS) diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index b9ed72bc1..52c77383d 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -36,90 +36,49 @@ namespace org::apache::nifi::minifi::extensions::python::processors { -void ExecutePythonProcessor::initializeScript() { - if (processor_initialized_) { - logger_->log_debug("Processor has already been initialized, returning..."); - return; - } - - try { - loadScript(); - } catch(const std::runtime_error&) { - return; - } - - // In case of native python processors we require initialization before onSchedule - // so that we can provide manifest of processor identity on C2 - python_script_engine_ = createScriptEngine(); - initalizeThroughScriptEngine(); -} - void ExecutePythonProcessor::initialize() { initializeScript(); - std::vector<core::PropertyReference> all_properties{Properties.begin(), Properties.end()}; + std::vector<core::PropertyReference> all_properties; ranges::transform(python_properties_, std::back_inserter(all_properties), &core::Property::getReference); setSupportedProperties(all_properties); setSupportedRelationships(Relationships); logger_->log_debug("Processor has been initialized."); } -void ExecutePythonProcessor::initalizeThroughScriptEngine() { - try { - appendPathForImportModules(); - python_script_engine_->appendModulePaths(python_paths_); - python_script_engine_->setModuleAttributes(qualified_module_name_); - python_script_engine_->eval(script_to_exec_); - if (python_class_name_) { - python_script_engine_->initializeProcessorObject(*python_class_name_); - } - python_script_engine_->describe(this); - python_script_engine_->onInitialize(this); - processor_initialized_ = true; - } catch (const PythonScriptWarning&) { - throw; - } catch (const std::exception& e) { - std::string python_processor_name = python_class_name_ ? *python_class_name_ : script_file_path_; - logger_->log_error("Failed to initialize python processor '{}' due to error: {}", python_processor_name, e.what()); - throw; - } -} - void ExecutePythonProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*sessionFactory*/) { - context.addAutoTerminatedRelationship(Original); - if (!processor_initialized_) { - script_file_path_ = context.getProperty(ScriptFile.name).value_or(""); - script_to_exec_ = context.getProperty(ScriptBody.name).value_or(""); - module_directory_ = context.getProperty(ModuleDirectory.name).value_or(""); - loadScript(); - python_script_engine_ = createScriptEngine(); - initalizeThroughScriptEngine(); - } else { - reloadScriptIfUsingScriptFileProperty(); - if (script_to_exec_.empty()) { - throw std::runtime_error("Neither Script Body nor Script File is available to execute"); - } - } - gsl_Expects(python_script_engine_); + context.addAutoTerminatedRelationship(Original); + reloadScriptFile(); python_script_engine_->eval(script_to_exec_); python_script_engine_->onSchedule(context); - - reload_on_script_change_ = utils::parseBoolProperty(context, ReloadOnScriptChange); } void ExecutePythonProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { - reloadScriptIfUsingScriptFileProperty(); - if (script_to_exec_.empty()) { - throw std::runtime_error("Neither Script Body nor Script File is available to execute"); - } - + gsl_Expects(python_script_engine_); + reloadScriptFile(); python_script_engine_->onTrigger(context, session); } -void ExecutePythonProcessor::appendPathForImportModules() const { - if (!module_directory_.empty()) { - python_script_engine_->appendModulePaths(utils::string::splitAndTrimRemovingEmpty(module_directory_, ",") | ranges::to<std::vector<std::filesystem::path>>()); - } +std::vector<core::Relationship> ExecutePythonProcessor::getPythonRelationships() const { + gsl_Expects(python_script_engine_); + std::vector<core::Relationship> relationships{Relationships.begin(), Relationships.end()}; + auto custom_relationships = python_script_engine_->getCustomPythonRelationships(); + relationships.reserve(relationships.size() + std::distance(custom_relationships.begin(), custom_relationships.end())); + relationships.insert(relationships.end(), custom_relationships.begin(), custom_relationships.end()); + return relationships; +} + +void ExecutePythonProcessor::setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) { + gsl_Expects(logger_ && python_logger_); + logger_->setLogCallback(callback); + python_logger_->setLogCallback(callback); +} + +void ExecutePythonProcessor::initializeScript() { + loadScriptFromFile(); + last_script_write_time_ = utils::file::last_write_time(script_file_path_); + python_script_engine_ = createScriptEngine(); + initializeThroughScriptEngine(); } void ExecutePythonProcessor::loadScriptFromFile() { @@ -129,39 +88,47 @@ void ExecutePythonProcessor::loadScriptFromFile() { throw std::runtime_error("Failed to read Script File: " + script_file_path_); } script_to_exec_ = std::string{ (std::istreambuf_iterator<char>(file_handle)), (std::istreambuf_iterator<char>()) }; + if (script_to_exec_.empty()) { + throw std::runtime_error("Script body to execute is empty"); + } } -void ExecutePythonProcessor::loadScript() { - if (script_file_path_.empty() && script_to_exec_.empty()) { - throw std::runtime_error("Neither Script Body nor Script File is available to execute"); - } +std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() { + auto engine = std::make_unique<PythonScriptEngine>(); + engine->initialize(Success, Failure, Original, python_logger_); + return engine; +} - if (!script_file_path_.empty()) { - if (!script_to_exec_.empty()) { - throw std::runtime_error("Only one of Script File or Script Body may be used"); +void ExecutePythonProcessor::initializeThroughScriptEngine() { + gsl_Expects(python_script_engine_); + try { + python_script_engine_->appendModulePaths(python_paths_); + python_script_engine_->setModuleAttributes(qualified_module_name_); + python_script_engine_->eval(script_to_exec_); + if (python_class_name_) { + python_script_engine_->initializeProcessorObject(*python_class_name_); } - loadScriptFromFile(); - last_script_write_time_ = utils::file::last_write_time(script_file_path_); + python_script_engine_->describe(this); + python_script_engine_->onInitialize(this); + } catch (const PythonScriptWarning&) { + throw; + } catch (const std::exception& e) { + std::string python_processor_name = python_class_name_ ? *python_class_name_ : script_file_path_; + logger_->log_error("Failed to initialize python processor '{}' due to error: {}", python_processor_name, e.what()); + throw; } } -void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() { - if (script_file_path_.empty() || !reload_on_script_change_) { - return; - } +void ExecutePythonProcessor::reloadScriptFile() { auto file_write_time = utils::file::last_write_time(script_file_path_); - if (file_write_time != last_script_write_time_) { - logger_->log_debug("Script file has changed since last time, reloading..."); - loadScriptFromFile(); - last_script_write_time_ = file_write_time; - python_script_engine_->eval(script_to_exec_); + if (file_write_time == last_script_write_time_) { + return; } -} -std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() { - auto engine = std::make_unique<PythonScriptEngine>(); - engine->initialize(Success, Failure, Original, python_logger_); - return engine; + logger_->log_debug("Script file has changed since last time, reloading..."); + loadScriptFromFile(); + last_script_write_time_ = file_write_time; + python_script_engine_->eval(script_to_exec_); } namespace { @@ -177,8 +144,7 @@ enum class PropertyValidatorCode : int64_t { const core::PropertyValidator& translateCodeToPropertyValidator(const PropertyValidatorCode& code) { switch (code) { - case PropertyValidatorCode::INTEGER: // NOLINT(*-branch-clone) - return core::StandardPropertyValidators::INTEGER_VALIDATOR; + case PropertyValidatorCode::INTEGER: case PropertyValidatorCode::LONG: return core::StandardPropertyValidators::INTEGER_VALIDATOR; case PropertyValidatorCode::BOOLEAN: @@ -219,20 +185,4 @@ void ExecutePythonProcessor::addProperty(const std::string &name, const std::str python_properties_.emplace_back(property); } -std::vector<core::Relationship> ExecutePythonProcessor::getPythonRelationships() const { - std::vector<core::Relationship> relationships{Relationships.begin(), Relationships.end()}; - auto custom_relationships = python_script_engine_->getCustomPythonRelationships(); - relationships.reserve(relationships.size() + std::distance(custom_relationships.begin(), custom_relationships.end())); - relationships.insert(relationships.end(), custom_relationships.begin(), custom_relationships.end()); - return relationships; -} - -void ExecutePythonProcessor::setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) { - gsl_Expects(logger_ && python_logger_); - logger_->setLogCallback(callback); - python_logger_->setLogCallback(callback); -} - -REGISTER_RESOURCE(ExecutePythonProcessor, Processor); - } // namespace org::apache::nifi::minifi::extensions::python::processors diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index ae9042428..5f2d13b00 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -42,43 +42,14 @@ class ExecutePythonProcessor : public core::ProcessorImpl { public: explicit ExecutePythonProcessor(core::ProcessorMetadata metadata) : ProcessorImpl(metadata), - processor_initialized_(false), - python_dynamic_(false), - reload_on_script_change_(true) { + python_dynamic_(false) { python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName(), metadata.uuid); } - EXTENSIONAPI static constexpr const char* Description = "DEPRECATED. This processor should only be used internally for running NiFi and MiNiFi C++ style python processors. " - "Do not use this processor in your own flows, move your python processors to the minifi-python directory instead, where they will be parsed, " - "and then they can be used with their filename as the processor class in the flow configuration.\n\n" - "This processor executes a script given the flow file and a process session. " - "The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as " - "any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back. Scripts must define an onTrigger function which accepts NiFi Context " - "and ProcessSession objects. Scripts are executed once when the processor is run, then the onTrigger method is called for each incoming flowfile. This enables scripts to keep state " - "if they wish. The python script files are expected to contain `describe(processor)` and `onTrigger(context, session)`."; - - EXTENSIONAPI static constexpr auto ScriptFile = core::PropertyDefinitionBuilder<>::createProperty("Script File") - .withDescription("Path to script file to execute. Only one of Script File or Script Body may be used") - .build(); - EXTENSIONAPI static constexpr auto ScriptBody = core::PropertyDefinitionBuilder<>::createProperty("Script Body") - .withDescription("Script to execute. Only one of Script File or Script Body may be used") - .build(); - EXTENSIONAPI static constexpr auto ModuleDirectory = core::PropertyDefinitionBuilder<>::createProperty("Module Directory") - .withDescription("Comma-separated list of paths to files and/or directories which contain modules required by the script") - .build(); - EXTENSIONAPI static constexpr auto ReloadOnScriptChange = core::PropertyDefinitionBuilder<>::createProperty("Reload on Script Change") - .withDescription("If true and Script File property is used, then script file will be reloaded if it has changed, otherwise the first loaded version will be used at all times.") - .isRequired(true) - .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) - .withDefaultValue("true") - .build(); - EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ - ScriptFile, - ScriptBody, - ModuleDirectory, - ReloadOnScriptChange - }); + EXTENSIONAPI static constexpr const char* Description = "This processor is only used internally for running NiFi and MiNiFi C++ style python processors. Do not use this processor in your own " + "flows. Move your python processors to the minifi-python directory where they will be parsed and then they can be used with filename as processor classes."; + EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{}; EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Script succeeds"}; EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Script fails"}; @@ -96,7 +67,6 @@ class ExecutePythonProcessor : public core::ProcessorImpl { bool isSingleThreaded() const override { return IsSingleThreaded; } ADD_GET_PROCESSOR_NAME - void initializeScript(); void initialize() override; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; @@ -155,11 +125,9 @@ class ExecutePythonProcessor : public core::ProcessorImpl { std::string description_; std::optional<std::string> version_; - bool processor_initialized_; bool python_dynamic_; std::string script_to_exec_; - bool reload_on_script_change_; std::optional<std::chrono::file_clock::time_point> last_script_write_time_; std::string script_file_path_; std::shared_ptr<core::logging::Logger> python_logger_; @@ -167,15 +135,12 @@ class ExecutePythonProcessor : public core::ProcessorImpl { std::optional<std::string> python_class_name_; std::vector<std::filesystem::path> python_paths_; std::string qualified_module_name_; - std::string module_directory_; - void appendPathForImportModules() const; + void initializeScript(); void loadScriptFromFile(); - void loadScript(); - void reloadScriptIfUsingScriptFileProperty(); - void initalizeThroughScriptEngine(); - std::unique_ptr<PythonScriptEngine> createScriptEngine(); + void initializeThroughScriptEngine(); + void reloadScriptFile(); }; } // namespace org::apache::nifi::minifi::extensions::python::processors diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md index b5b8ae658..13044261d 100644 --- a/extensions/python/PYTHON.md +++ b/extensions/python/PYTHON.md @@ -16,14 +16,21 @@ # Apache NiFi - MiNiFi - Python Processors Readme -This readme defines the configuration parameters to use ExecutePythonProcessor to run native python processors. ExecutePythonProcessor is not used explicitly in the flow, instead the python processors are referenced directly by their filename in the flow configuration, ExecutePythonProcessor only manages the execution of these processors under the hood. +This readme defines the configuration parameters to run native python processors. ## Table of Contents - [Requirements](#requirements) + - [CentOS/RHEL system python](#centosrhel-system-python) + - [Debian/Ubuntu system python](#debianubuntu-system-python) + - [Windows system python](#windows-system-python) + - [Anaconda](#anaconda) + - [PyEnv](#pyenv) - [Description](#description) - [Installation](#installation) - [Configuration](#configuration) - [Installing Python processors](#installing-python-processors) + - [Example: Sentiment Analysis](#example-sentiment-analysis) +- [Using MiNiFi C++ style native python processors](#using-minifi-c-style-native-python-processors) - [Using NiFi Python Processors](#using-nifi-python-processors) - [Use Python processors from virtualenv](#use-python-processors-from-virtualenv) - [Automatically install dependencies from requirements.txt files](#automatically-install-dependencies-from-requirementstxt-files) @@ -75,10 +82,13 @@ export LD_LIBRARY_PATH="${PYENV_ROOT}/versions/${PY_VERSION}/lib${LD_LIBRARY_PAT ## Description -Python native processors can be updated at any time by simply adding a new processor to the directory defined in -the `minifi.properties` file in the `nifi.python.processor.dir` property. The processor name, when provided to MiNiFi C++ and any C2 manifest will be that +There are two types of python processors that can be used: MiNiFi C++ style native python processors and NiFi style python processors. MiNiFi C++ style native python processors are defined using a simple python API, while NiFi python processors use the NiFi python API documented in the [Apache NiFi Python Developer’s Guide](https://nifi.apache.org/nifi-docs/python-developer-guide.html). This readme describes the use of both types of processors. + +Python processors can be updated at any time by simply adding a new processor to the minifi-python directory defined in +the `minifi.properties` file in the `nifi.python.processor.dir` property. For NiFi style python processors the `${nifi.python.processor.dir}/nifi_python_processors` directory should be used. +The processor name, when provided to MiNiFi C++ and any C2 manifest will be that of the name of the python script. For example, "AttributePrinter.py" will be named and referenced in the flow -as "org.apache.nifi.minifi.processors.AttributePrinter" and would look something like this in the flow configuration: +as "org.apache.nifi.minifi.processors.AttributePrinter" and would look like this in the flow configuration: ```yaml - name: My AttributePrinter @@ -93,55 +103,25 @@ as "org.apache.nifi.minifi.processors.AttributePrinter" and would look something Attributes To Print: filename,path ``` -Every python processor has a success, failure and original relationship where the original relationship is auto-terminated by default. - -Methods that are enabled within the processor are describe, onSchedule, onInitialize, and onTrigger. - -Describe is passed the processor and is a required function. You must set the description like so: - -```python -def describe(processor): - processor.setDescription("Adds an attribute to your flow files") -``` - -onInitialize is also passed the processor reference and can be where you set properties. The first argument is the property display name, -followed by the description, and default value. The next three arguments are booleans describing if the property is required, support expression language, and if it is a sensitive property. -The seventh argument is the property type code. The property type code is an integer that represents the type of the property. The supported property type codes and their corresponding types: -``` -INTEGER = 0 -LONG = 1 -BOOLEAN = 2 -DATA_SIZE = 3 -TIME_PERIOD = 4 -NON_BLANK = 5 -PORT = 6 -``` - -The last parameter of addProperty is the controller service type. If the property is a controller service, the controller service type should be provided. It should be the non-qualified type name of the controller service. Currently SSLContextService is the only controller service type supported. +The class name must follow the directory hierarchy after the `org.apache.nifi.minifi.processors` prefix. For example, if there is a NiFi style python processor in the nifi_python_processors directory in the mypackage package (which means placed in the `minifi-python/nifi_python_processors/mypackage` directory) and the processor file is named MyProcessor.py, the class name will be `org.apache.nifi.minifi.processors.nifi_python_processors.mypackage.MyProcessor`: -```python -def onInitialize(processor): - processor.setSupportsDynamicProperties() - # arguments: property name, description, default value, is required, expression language supported, is sensitive, property type code, controller service type name - processor.addProperty("property name", "description", "default value", True, False, False, 1, None) +```yaml +- name: My MyProcessor + id: e143601d-de4f-44ba-a6ec-d1f97d77ec94 + class: org.apache.nifi.minifi.processors.nifi_python_processors.mypackage.MyProcessor + scheduling strategy: EVENT_DRIVEN + auto-terminated relationships list: + - failure + - success + - original + Properties: ``` -The onSchedule function is passed the context and session factory. This should be where your processor loads and reads properties via -the getProperty function. onTrigger is executed and passed the processor context and session. You may keep state within your processor. - -Much like C++ processors, callbacks may be defined for reading/writing streams of data through the session. Those callback classes will -have a process function that accepts the input stream. You may use codecs getReader to read that data as in the example, below, from -VaderSentiment +Every python processor has a success, failure and original relationship where the original relationship is auto-terminated by default. -```python -class VaderSentiment(object): - def __init__(self): - self.content = None +To see some example minifi style native python processors, please refer to the pythonprocessor-examples in the repository under the extensions/python/pythonprocessor-examples directory. - def process(self, input_stream): - self.content = codecs.getreader('utf-8')(input_stream).read() - return len(self.content) -``` +For NiFi style python processor examples please refer to the [Apache NiFi Python Extensions](https://github.com/apache/nifi-python-extensions) repository. ## Installation @@ -165,6 +145,8 @@ to the reference class name. #directory where processors exist nifi.python.processor.dir=XXXX +NiFi style python processor specific configuration options are described in the [Using NiFi Python Processors](#using-nifi-python-processors) section. + ## Installing Python processors @@ -190,9 +172,59 @@ The SentimentAnalysis processor will perform a Vader Sentiment Analysis. This re pip install VaderSentiment +## Using MiNiFi C++ style native python processors + +In MiNiFi C++ style native python processors, methods that are enabled within the processor are describe, onSchedule, onInitialize, and onTrigger. + +Describe is passed the processor and is a required function. You must set the description like so: + +```python +def describe(processor): + processor.setDescription("Adds an attribute to your flow files") +``` + +onInitialize is also passed the processor reference and can be where you set properties. The first argument is the property display name, +followed by the description, and default value. The next three arguments are booleans describing if the property is required, support expression language, and if it is a sensitive property. +The seventh argument is the property type code. The property type code is an integer that represents the type of the property. The supported property type codes and their corresponding types: +``` +INTEGER = 0 +LONG = 1 +BOOLEAN = 2 +DATA_SIZE = 3 +TIME_PERIOD = 4 +NON_BLANK = 5 +PORT = 6 +``` + +The last parameter of addProperty is the controller service type. If the property is a controller service, the controller service type should be provided. It should be the non-qualified type name of the controller service. Currently SSLContextService is the only controller service type supported. + +```python +def onInitialize(processor): + processor.setSupportsDynamicProperties() + # arguments: property name, description, default value, is required, expression language supported, is sensitive, property type code, controller service type name + processor.addProperty("property name", "description", "default value", True, False, False, 1, None) +``` + +The onSchedule function is passed the context and session factory. This should be where your processor loads and reads properties via +the getProperty function. onTrigger is executed and passed the processor context and session. You may keep state within your processor. + +Much like C++ processors, callbacks may be defined for reading/writing streams of data through the session. Those callback classes will +have a process function that accepts the input stream. You may use `codecs.getreader` to read that data as in the example below, from +VaderSentiment + +```python +class VaderSentiment(object): + def __init__(self): + self.content = None + + def process(self, input_stream): + self.content = codecs.getreader('utf-8')(input_stream).read() + return len(self.content) +``` + ## Using NiFi Python Processors -MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform, RecordTransform or the FlowFileSource base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org [...] +MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform, RecordTransform or the FlowFileSource base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org [...] In the flow configuration these Python processors can be referenced by their fully qualified class name, which looks like this: org.apache.nifi.minifi.processors.nifi_python_processors.<package_name>.<processor_name>. For example, the fully qualified class name of the PromptChatGPT processor implemented in the file nifi_python_processors/PromptChatGPT.py is org.apache.nifi.minifi.processors.nifi_python_processors.PromptChatGPT. If a processor is copied under a subdirectory, because it is [...] diff --git a/extensions/python/PythonObjectFactory.h b/extensions/python/PythonObjectFactory.h index 8073dc6af..9910a9619 100644 --- a/extensions/python/PythonObjectFactory.h +++ b/extensions/python/PythonObjectFactory.h @@ -57,7 +57,6 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::ProcessorFac } ptr->setQualifiedModuleName(qualified_module_name_); ptr->setScriptFilePath(file_); - ptr->initializeScript(); return ptr; } diff --git a/extensions/python/tests/ExecutePythonProcessorTests.cpp b/extensions/python/tests/ExecutePythonProcessorTests.cpp index 7585de008..f04c4e3a4 100644 --- a/extensions/python/tests/ExecutePythonProcessorTests.cpp +++ b/extensions/python/tests/ExecutePythonProcessorTests.cpp @@ -82,17 +82,23 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase { public: enum class Expectation { OUTPUT_FILE_MATCHES_INPUT, - RUNTIME_RELATIONSHIP_EXCEPTION + RUNTIME_RELATIONSHIP_EXCEPTION, + INITIALIZATION_EXCEPTION }; protected: - void testSimpleFilePassthrough(const Expectation expectation, const core::Relationship& execute_python_out_conn, const std::string& used_as_script_file, const std::string& used_as_script_body) { + void testSimpleFilePassthrough(const Expectation expectation, const core::Relationship& execute_python_out_conn, const std::string& used_as_script_file) { reInitialize(); auto input_dir = testController_->createTempDirectory(); putFileToDir(input_dir, TEST_FILE_NAME, TEST_FILE_CONTENT); addGetFileProcessorToPlan(input_dir); - REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body)); + if (Expectation::INITIALIZATION_EXCEPTION == expectation) { + REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file)); + return; + } else { + REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file)); + } const auto output_dir = testController_->createTempDirectory(); addPutFileProcessorToPlan(execute_python_out_conn, output_dir.string()); @@ -115,10 +121,7 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase { void testsStatefulProcessor() { reInitialize(); const auto output_dir = testController_->createTempDirectory(); - - auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor"); - plan_->setProperty(executePythonProcessor, minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, getScriptFullPath("stateful_processor.py").string()); - + addExecutePythonProcessorToPlan(getScriptFullPath("stateful_processor.py"), false); addPutFileProcessorToPlan(core::Relationship("success", "description"), output_dir); plan_->runNextProcessor(); // ExecutePythonProcessor for (std::size_t i = 0; i < 10; ++i) { @@ -141,15 +144,17 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase { return getfile; } - core::Processor* addExecutePythonProcessorToPlan(const std::filesystem::path& used_as_script_file, const std::string& used_as_script_body) { - auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", core::Relationship("success", "description"), true); - if (!used_as_script_file.empty()) { - plan_->setProperty(executePythonProcessor, minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, getScriptFullPath(used_as_script_file).string()); - } - if (!used_as_script_body.empty()) { - plan_->setProperty(executePythonProcessor, minifi::extensions::python::processors::ExecutePythonProcessor::ScriptBody, getFileContent(getScriptFullPath(used_as_script_body))); - } - return executePythonProcessor; + core::Processor* addExecutePythonProcessorToPlan(const std::filesystem::path& used_as_script_file, bool link_to_previous = true) { + auto uuid = utils::IdGenerator::getIdGenerator()->generate(); + auto execute_python_processor = std::make_unique<minifi::extensions::python::processors::ExecutePythonProcessor>(core::ProcessorMetadata{ + .uuid = uuid, + .name = "executePythonProcessor", + .logger = logging::LoggerFactory<minifi::extensions::python::processors::ExecutePythonProcessor>::getLogger(uuid) + }); + execute_python_processor->setScriptFilePath(getScriptFullPath(used_as_script_file).string()); + auto execute_python_processor_unique_ptr = std::make_unique<core::Processor>(execute_python_processor->getName(), execute_python_processor->getUUID(), std::move(execute_python_processor)); + auto processor = plan_->addProcessor(std::move(execute_python_processor_unique_ptr), "executePythonProcessor", core::Relationship("success", "description"), link_to_previous); + return processor; } core::Processor* addPutFileProcessorToPlan(const core::Relationship& execute_python_outbound_connection, const std::filesystem::path& dir_path) { @@ -157,56 +162,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase { plan_->setProperty(putfile, minifi::processors::PutFile::Directory, dir_path.string()); return putfile; } - - void testReloadOnScriptProperty(std::optional<bool> reload_on_script_change, uint32_t expected_success_file_count, uint32_t expected_failure_file_count) { - const auto input_dir = testController_->createTempDirectory(); - putFileToDir(input_dir, TEST_FILE_NAME, TEST_FILE_CONTENT); - addGetFileProcessorToPlan(input_dir); - auto script_content{ getFileContent(getScriptFullPath("passthrough_processor_transfering_to_success.py")) }; - const auto reloaded_script_dir = testController_->createTempDirectory(); - putFileToDir(reloaded_script_dir, "reloaded_script.py", script_content); - - auto execute_python_processor = addExecutePythonProcessorToPlan(reloaded_script_dir / "reloaded_script.py", ""); - if (reload_on_script_change) { - plan_->setProperty(execute_python_processor, minifi::extensions::python::processors::ExecutePythonProcessor::ReloadOnScriptChange, *reload_on_script_change ? "true" : "false"); - } - - auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false); - plan_->addConnection(execute_python_processor, {"success", "d"}, success_putfile); - success_putfile->setAutoTerminatedRelationships(std::array{core::Relationship{"success", "d"}, core::Relationship{"failure", "d"}}); - auto success_output_dir = testController_->createTempDirectory(); - plan_->setProperty(success_putfile, minifi::processors::PutFile::Directory, success_output_dir.string()); - - auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false); - plan_->addConnection(execute_python_processor, {"failure", "d"}, failure_putfile); - failure_putfile->setAutoTerminatedRelationships(std::array{core::Relationship{"success", "d"}, core::Relationship{"failure", "d"}}); - auto failure_output_dir = testController_->createTempDirectory(); - plan_->setProperty(failure_putfile, minifi::processors::PutFile::Directory, failure_output_dir.string()); - - testController_->runSession(plan_); - plan_->reset(); - script_content = getFileContent(getScriptFullPath("passthrough_processor_transfering_to_failure.py")); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure the file gets newer modification time - putFileToDir(reloaded_script_dir, "reloaded_script.py", script_content); - testController_->runSession(plan_); - - std::vector<std::string> file_contents; - - auto lambda = [&file_contents](const std::filesystem::path& path, const std::filesystem::path& filename) -> bool { - std::ifstream is(path / filename, std::ifstream::binary); - file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); - return true; - }; - - utils::file::FileUtils::list_dir(success_output_dir, lambda, plan_->getLogger(), false); - - REQUIRE(file_contents.size() == expected_success_file_count); - - file_contents.clear(); - utils::file::FileUtils::list_dir(failure_output_dir, lambda, plan_->getLogger(), false); - - REQUIRE(file_contents.size() == expected_failure_file_count); - } }; // Test for python processors for simple passthrough cases @@ -221,95 +176,29 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase { // TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[executePythonProcessorSimple]") { // Expectations - const auto OUTPUT_FILE_MATCHES_INPUT = SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT; - const auto RUNTIME_RELATIONSHIP_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION; + const auto OUTPUT_FILE_MATCHES_INPUT = SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT; + const auto RUNTIME_RELATIONSHIP_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION; + const auto INITIALIZATION_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::INITIALIZATION_EXCEPTION; // ExecutePython outbound relationships - const core::Relationship SUCCESS {"success", "description"}; + const core::Relationship SUCCESS{"success", "description"}; const core::Relationship FAILURE{"failure", "description"}; - // For the tests "" is treated as none-provided since no optional implementation was ported to the project yet - // 0. Neither valid script file nor script body provided - // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", ""); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", ""); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "non_existent_script.py", ""); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "non_existent_script.py", ""); // NOLINT + // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE + testSimpleFilePassthrough(INITIALIZATION_EXCEPTION, SUCCESS, "non_existent_script.py"); // NOLINT + testSimpleFilePassthrough(INITIALIZATION_EXCEPTION, FAILURE, "non_existent_script.py"); // NOLINT // 1. Using script file as attribute - // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY - testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "passthrough_processor_transfering_to_success.py", ""); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "passthrough_processor_transfering_to_success.py", ""); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_failure.py", ""); // NOLINT - testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "passthrough_processor_transfering_to_failure.py", ""); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "non_transferring_processor.py", ""); // NOLINT - - // 2. Using script body as attribute - // TEST EXPECTATION OUT_REL SCRIPT_FILE USE_AS_SCRIPT_BODY - testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py"); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py"); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT - testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "non_transferring_processor.py"); // NOLINT - - // 3. Setting both attributes - // TEST EXPECTATION OUT_REL SCRIPT_FILE USE_AS_SCRIPT_BODY - testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_success.py", "passthrough_processor_transfering_to_success.py"); // NOLINT + // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE + testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "passthrough_processor_transfering_to_success.py"); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "passthrough_processor_transfering_to_success.py"); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_failure.py"); // NOLINT + testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "passthrough_processor_transfering_to_failure.py"); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "non_transferring_processor.py"); // NOLINT } TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Stateful execution", "[executePythonProcessorStateful]") { testsStatefulProcessor(); } -TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Test the Reload On Script Change property", "[executePythonProcessorReloadScript]") { - SECTION("When Reload On Script Change is not set the script is reloaded by default") { - const uint32_t EXPECTED_SUCCESS_FILE_COUNT = 1; - const uint32_t EXPECTED_FAILURE_FILE_COUNT = 1; - testReloadOnScriptProperty(std::nullopt, EXPECTED_SUCCESS_FILE_COUNT, EXPECTED_FAILURE_FILE_COUNT); - } - - SECTION("When Reload On Script Change is set to true both transfer to success and failure scripts are run") { - const uint32_t EXPECTED_SUCCESS_FILE_COUNT = 1; - const uint32_t EXPECTED_FAILURE_FILE_COUNT = 1; - testReloadOnScriptProperty(true, EXPECTED_SUCCESS_FILE_COUNT, EXPECTED_FAILURE_FILE_COUNT); - } - - SECTION("When Reload On Script Change is set to false only transfer to success script is run") { - const uint32_t EXPECTED_SUCCESS_FILE_COUNT = 1; - const uint32_t EXPECTED_FAILURE_FILE_COUNT = 0; - testReloadOnScriptProperty(false, EXPECTED_SUCCESS_FILE_COUNT, EXPECTED_FAILURE_FILE_COUNT); - } -} - -TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Test module load of processor", "[executePythonProcessorModuleLoad]") { - const auto input_dir = testController_->createTempDirectory(); - putFileToDir(input_dir, TEST_FILE_NAME, TEST_FILE_CONTENT); - addGetFileProcessorToPlan(input_dir); - - auto execute_python_processor = addExecutePythonProcessorToPlan("foo_bar_processor.py", ""); - plan_->setProperty(execute_python_processor, minifi::extensions::python::processors::ExecutePythonProcessor::ModuleDirectory, - getScriptFullPath(std::filesystem::path("foo_modules")/"foo.py").string() + "," + getScriptFullPath("bar_modules").string()); - - auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false); - plan_->addConnection(execute_python_processor, {"success", "d"}, success_putfile); - success_putfile->setAutoTerminatedRelationships(std::array{core::Relationship{"success", "d"}, core::Relationship{"failure", "d"}}); - auto success_output_dir = testController_->createTempDirectory(); - plan_->setProperty(success_putfile, minifi::processors::PutFile::Directory, success_output_dir.string()); - - testController_->runSession(plan_); - plan_->reset(); - - std::vector<std::string> file_contents; - - auto lambda = [&file_contents](const std::filesystem::path& path, const std::filesystem::path& filename) -> bool { - std::ifstream is(path / filename, std::ifstream::binary); - file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); - return true; - }; - - utils::file::FileUtils::list_dir(success_output_dir, lambda, plan_->getLogger(), false); - - REQUIRE(file_contents.size() == 1); -} - } // namespace diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index 68759e462..b410f9fae 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h @@ -265,11 +265,14 @@ class TestPlan { const minifi::core::Relationship& relationship = minifi::core::Relationship("success", "description"), bool linkToPrevious = false) { return addProcessor(processor_name, name, { relationship }, linkToPrevious); } - minifi::core::Processor* addProcessor(std::unique_ptr<minifi::core::Processor> processor, const std::string &name, const std::initializer_list<minifi::core::Relationship>& relationships, bool linkToPrevious = false); // NOLINT - minifi::core::Processor* addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<minifi::core::Relationship>& relationships, bool linkToPrevious = false); // NOLINT - minifi::core::Processor* addProcessor(const std::string &processor_name, const minifi::utils::Identifier& uuid, const std::string &name, const std::initializer_list<minifi::core::Relationship>& relationships, bool linkToPrevious = false); // NOLINT - - minifi::Connection* addConnection(minifi::core::Processor* source_proc, const minifi::core::Relationship& source_relationship, minifi::core::Processor* destination_proc); // NOLINT + minifi::core::Processor* addProcessor(std::unique_ptr<minifi::core::Processor> processor, const std::string &name, const std::initializer_list<minifi::core::Relationship>& relationships, + bool linkToPrevious = false); + minifi::core::Processor* addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<minifi::core::Relationship>& relationships, + bool linkToPrevious = false); + minifi::core::Processor* addProcessor(const std::string &processor_name, const minifi::utils::Identifier& uuid, const std::string &name, + const std::initializer_list<minifi::core::Relationship>& relationships, bool linkToPrevious = false); + + minifi::Connection* addConnection(minifi::core::Processor* source_proc, const minifi::core::Relationship& source_relationship, minifi::core::Processor* destination_proc); std::shared_ptr<minifi::core::controller::ControllerServiceNode> addController(const std::string &controller_name, const std::string &name);
