hunyadi-dev commented on a change in pull request #784: URL: https://github.com/apache/nifi-minifi-cpp/pull/784#discussion_r431867122
########## File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp ########## @@ -0,0 +1,276 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN + +#include <memory> +#include <string> +#include <set> + +#include "../TestBase.h" + +#include "processors/GetFile.h" +#include "python/ExecutePythonProcessor.h" +#include "processors/LogAttribute.h" +#include "processors/PutFile.h" +#include "utils/file/FileUtils.h" + +namespace { + +#include <unistd.h> +#define GetCurrentDir getcwd + +std::string GetCurrentWorkingDir(void) { + char buff[FILENAME_MAX]; + GetCurrentDir(buff, FILENAME_MAX); + std::string current_working_dir(buff); + return current_working_dir; +} + +class ExecutePythonProcessorTestBase { + public: + ExecutePythonProcessorTestBase() : + logTestController_(LogTestController::getInstance()), + logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger()) { + reInitialize(); + } + virtual ~ExecutePythonProcessorTestBase() { + logTestController_.reset(); + logTestController_.setDebug<TestPlan>(); + logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>(); + logTestController_.setDebug<minifi::processors::PutFile>(); + logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>(); + } + + protected: + void reInitialize() { + testController_.reset(new TestController()); + plan_ = testController_->createPlan(); + } + + std::string createTempDir() { + char dirtemplate[] = "/tmp/gt.XXXXXX"; + std::string temp_dir = testController_->createTempDirectory(dirtemplate); + REQUIRE(!temp_dir.empty()); + struct stat buffer; + REQUIRE(-1 != stat(temp_dir.c_str(), &buffer)); + REQUIRE(S_ISDIR(buffer.st_mode)); + return temp_dir; + } + + std::string putFileToDir(const std::string& dir_path, const std::string& file_name, const std::string& content) { + std::string file_path(dir_path + utils::file::FileUtils::get_separator() + file_name); + std::ofstream out_file(file_path); + if (out_file.is_open()) { + out_file << content; + out_file.close(); + } + return file_path; + } + + std::string createTempDirWithFile(const std::string& file_name, const std::string& content) { + std::string temp_dir = createTempDir(); + putFileToDir(temp_dir, file_name, content); + return temp_dir; + } + + std::string getFileContent(const std::string& file_name) { + std::ifstream file_handle(file_name); + REQUIRE(file_handle.is_open()); + const std::string file_content{ (std::istreambuf_iterator<char>(file_handle)), (std::istreambuf_iterator<char>())}; + file_handle.close(); + return file_content; + } + + std::string getScriptFullPath(const std::string& script_file_name) { + return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() + script_file_name; + } + + const std::string TEST_FILE_NAME{ "test_file.txt" }; + const std::string TEST_FILE_CONTENT{ "Test text\n" }; + const std::string SCRIPT_FILES_DIRECTORY{ "test_scripts" }; + + std::unique_ptr<TestController> testController_; + std::shared_ptr<TestPlan> plan_; + LogTestController& logTestController_; + std::shared_ptr<logging::Logger> logger_; +}; + +class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase { + public: + enum class Expectation { + OUTPUT_FILE_MATCHES_INPUT, + RUNTIME_RELATIONSHIP_EXCEPTION, + PROCESSOR_INITIALIZATION_EXCEPTION + }; + SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {} + + protected: + void testSimpleFilePassthrough(const Expectation expectation, const core::Relationship& execute_python_out_conn, const std::string& used_as_script_file, const std::string& used_as_script_body) { + reInitialize(); + const std::string input_dir = createTempDirWithFile(TEST_FILE_NAME, TEST_FILE_CONTENT); + const std::string output_dir = createTempDir(); + + addGetFileProcessorToPlan(input_dir); + if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) { + REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body)); + return; + } + REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body)); + addPutFileProcessorToPlan(execute_python_out_conn, output_dir); + + plan_->runNextProcessor(); // GetFile + if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) { + REQUIRE_THROWS(plan_->runNextProcessor()); // ExecutePythonProcessor + return; + } + REQUIRE_NOTHROW(plan_->runNextProcessor()); // ExecutePythonProcessor + plan_->runNextProcessor(); // PutFile + + const std::string output_file_path = output_dir + utils::file::FileUtils::get_separator() + TEST_FILE_NAME; + + if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) { + const std::string output_file_content{ getFileContent(output_file_path) }; + REQUIRE(TEST_FILE_CONTENT == output_file_content); + } + } + void testsStatefulProcessor() { + reInitialize(); + const std::string output_dir = createTempDir(); + + auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor"); + plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(), getScriptFullPath("stateful_processor.py")); + executePythonProcessor -> initialize(); + + addPutFileProcessorToPlan(core::Relationship("success", "description"), output_dir); + plan_->runNextProcessor(); // ExecutePythonProcessor + for (std::size_t i = 0; i < 10; ++i) { + plan_->runCurrentProcessor(); // ExecutePythonProcessor + } + plan_->runNextProcessor(); // PutFile + for (std::size_t i = 0; i < 10; ++i) { + plan_->runCurrentProcessor(); // PutFile + const std::string state_name = std::to_string(i); + const std::string output_file_path = output_dir + utils::file::FileUtils::get_separator() + state_name; + const std::string output_file_content{ getFileContent(output_file_path) }; + REQUIRE(output_file_content == state_name); + } + } + + private: + std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const std::string& dir_path) { + std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", "getfileCreate2"); + plan_->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path); + plan_->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true"); + return getfile; + } + + std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const std::string& used_as_script_file, const std::string& used_as_script_body) { + auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", core::Relationship("success", "description"), true); + if ("" != used_as_script_file) { + plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(), getScriptFullPath(used_as_script_file)); + } + if ("" != used_as_script_body) { + plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(), getFileContent(getScriptFullPath(used_as_script_body))); + } + executePythonProcessor -> initialize(); + return executePythonProcessor; + } + + std::shared_ptr<core::Processor> addPutFileProcessorToPlan(const core::Relationship& execute_python_outbound_connection, const std::string& dir_path) { + std::shared_ptr<core::Processor> putfile = plan_->addProcessor("PutFile", "putfile", execute_python_outbound_connection, true); + plan_->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir_path); + return putfile; + } +}; + +// Test for python processors for simple passthrough cases +// +// +-----------+ +--------------------------+ +-----------+ +// | Getfile | .-+ ExecutePythonProcessor | .-+ PutFile | +// +-----------+ / + - - - - - - - - + / +-----------+ +// | success +-° | Attribute: Script | / | success +-+ checked +// +-----------+ +--------------------------+ / +-----------+ +// | success +-° +// +--------------------------+ +// | failure +-X either success or failure is hooked up +// +--------------------------+ +// +// testSimpleFilePassthrough(OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py"); +// +// translates to +// +// GIVEN an ExecutePythonProcessor set up with a "Script Body" attribute that transfers to REL_SUCCESS, but not "Script File" +// WHEN the processor is triggered +// THEN any consumer using the success relationship as source should receive the transfered data +// +TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[executePythonProcessorSimple]") { + // Expectations + const auto OUTPUT_FILE_MATCHES_INPUT = SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT; + const auto RUNTIME_RELATIONSHIP_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION; + const auto PROCESSOR_INITIALIZATION_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION; + // ExecutePython outbound relationships + const core::Relationship SUCCESS {"success", "description"}; + const core::Relationship FAILURE{"failure", "description"}; + + // For the tests "" is threated as none-provided since no optional implementation was ported to the project yet + + //////////////////////////////////////////////////////////// + // 0. Neither valid script file nor script body provided // + //////////////////////////////////////////////////////////// + + // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY + testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "", ""); // NOLINT + testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, "", ""); // NOLINT + testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "non_existent_script.py", ""); // NOLINT + testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, "non_existent_script.py", ""); // NOLINT + + /////////////////////////////////////// + // 1. Using script file as attribute // + /////////////////////////////////////// + + // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY + testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "passthrough_processor_transfering_to_success.py", ""); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "passthrough_processor_transfering_to_success.py", ""); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_failure.py", ""); // NOLINT + testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "passthrough_processor_transfering_to_failure.py", ""); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "non_transferring_processor.py", ""); // NOLINT + + /////////////////////////////////////// + // 2. Using script body as attribute // + /////////////////////////////////////// + + // TEST EXPECTATION OUT_REL SCRIPT_FILE USE_AS_SCRIPT_BODY + testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py"); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py"); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT + testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT + testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "non_transferring_processor.py"); // NOLINT + + //////////////////////////////// + // 3. Setting both attributes // + //////////////////////////////// + Review comment: Removed comment decoration. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
