szaszm commented on a change in pull request #784:
URL: https://github.com/apache/nifi-minifi-cpp/pull/784#discussion_r425930634



##########
File path: libminifi/test/script-tests/PythonExecuteScriptTests.cpp
##########
@@ -29,6 +29,15 @@
 #include "processors/GetFile.h"
 #include "processors/PutFile.h"
 
+// ,-------------------------------------------------------------,
+// | ! |                      Disclaimer                     | ! |
+// |---'                                                     '---'
+// |                                                             |
+// | This file contains tests for the "ExecuteScript" processor, |
+// |         not for the "ExecutePython" processor.              |
+// |                                                             |
+// '-------------------------------------------------------------'

Review comment:
       I'm against ASCII art in the code.
   
   
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#nl3-keep-comments-crisp

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
 
-    triggerSchedule(engine, context);
+    engine->eval(script_to_exec_);
+    engine->onSchedule(context);
 
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-  } catch (...) {
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
   }
 }
 
 void ExecutePythonProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+  if (!valid_init_) {
+    throw std::runtime_error("Could not correctly initialize " + getName());
+  }
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
 
-    triggerEngineProcessor(engine, context, session);
-
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+    engine->onTrigger(context, session);
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception &exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
     this->yield();
-  } catch (...) {
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
     this->yield();
   }
 }
 
+// TODO(hunyadi): This is potentially not what we want. See 
https://issues.apache.org/jira/browse/MINIFICPP-1222
+std::shared_ptr<python::PythonScriptEngine> 
ExecutePythonProcessor::getScriptEngine() {
+  std::shared_ptr<python::PythonScriptEngine> engine;
+  // Use an existing engine, if one is available
+  if (script_engine_q_.try_dequeue(engine)) {
+    logger_->log_debug("Using available [%p] script engine instance", 
engine.get());
+    return engine;
+  }
+  engine = createEngine<python::PythonScriptEngine>();
+  logger_->log_info("Created new [%p] script instance", engine.get());
+  logger_->log_info("Approximately %d script instances created for this 
processor", script_engine_q_.size_approx());

Review comment:
       This could be on a single log line.
   ```suggestion
     logger_->log_info("Created new [%p] script engine instance. Number of 
instances: approx. %d / %d.", engine.get(), script_engine_q_.size_approx(), 
getMaxConcurrentTasks());
   ```

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd
+
+std::string GetCurrentWorkingDir(void) {
+  char buff[FILENAME_MAX];
+  GetCurrentDir(buff, FILENAME_MAX);
+  std::string current_working_dir(buff);
+  return current_working_dir;
+}
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string createTempDir() {
+      char dirtemplate[] = "/tmp/gt.XXXXXX";
+      std::string temp_dir = testController_->createTempDirectory(dirtemplate);
+      REQUIRE(!temp_dir.empty());
+      struct stat buffer;
+      REQUIRE(-1 != stat(temp_dir.c_str(), &buffer));
+      REQUIRE(S_ISDIR(buffer.st_mode));
+      return temp_dir;
+    }
+
+    std::string putFileToDir(const std::string& dir_path, const std::string& 
file_name, const std::string& content) {
+      std::string file_path(dir_path + utils::file::FileUtils::get_separator() 
+ file_name);
+      std::ofstream out_file(file_path);
+      if (out_file.is_open()) {
+        out_file << content;
+        out_file.close();
+      }
+      return file_path;
+    }
+
+    std::string createTempDirWithFile(const std::string& file_name, const 
std::string& content) {
+      std::string temp_dir = createTempDir();
+      putFileToDir(temp_dir, file_name, content);
+      return temp_dir;
+    }
+
+    std::string getFileContent(const std::string& file_name) {
+      std::ifstream file_handle(file_name);
+      REQUIRE(file_handle.is_open());
+      const std::string file_content{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+      file_handle.close();
+      return file_content;
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    const std::string TEST_FILE_NAME{ "test_file.txt" };
+    const std::string TEST_FILE_CONTENT{ "Test text\n" };
+    const std::string SCRIPT_FILES_DIRECTORY{ "test_scripts" };
+
+    std::unique_ptr<TestController> testController_;
+    std::shared_ptr<TestPlan> plan_;
+    LogTestController& logTestController_;
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase 
{
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const 
core::Relationship& execute_python_out_conn, const std::string& 
used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(TEST_FILE_NAME, 
TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir();
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir();
+
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor -> initialize();
+
+    addPutFileProcessorToPlan(core::Relationship("success", "description"), 
output_dir);
+    plan_->runNextProcessor();  // ExecutePythonProcessor
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // ExecutePythonProcessor
+    }
+    plan_->runNextProcessor();  // PutFile
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // PutFile
+      const std::string state_name = std::to_string(i);
+      const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() + state_name;
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(output_file_content == state_name);
+    }
+  }
+
+ private:
+  std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const 
std::string& dir_path) {
+    std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", 
"getfileCreate2");
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path);
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), 
"true");
+    return getfile;
+  }
+
+  std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const 
std::string& used_as_script_file, const std::string& used_as_script_body) {
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", 
core::Relationship("success", "description"), true);
+    if ("" != used_as_script_file) {
+      plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath(used_as_script_file));
+    }
+    if ("" != used_as_script_body) {
+        plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(),
 getFileContent(getScriptFullPath(used_as_script_body)));
+    }
+    executePythonProcessor -> initialize();
+    return executePythonProcessor;
+  }
+
+  std::shared_ptr<core::Processor> addPutFileProcessorToPlan(const 
core::Relationship& execute_python_outbound_connection, const std::string& 
dir_path) {
+    std::shared_ptr<core::Processor> putfile = plan_->addProcessor("PutFile", 
"putfile", execute_python_outbound_connection, true);
+    plan_->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir_path);
+    return putfile;
+  }
+};
+
+// Test for python processors for simple passthrough cases
+//
+// +-----------+     +--------------------------+       +-----------+
+// |  Getfile  |   .-+  ExecutePythonProcessor  |     .-+  PutFile  |
+// +-----------+  /  +  -  -  -  -  -  -  -  -  +    /  +-----------+
+// |  success  +-°   |    Attribute: Script     |   /   |  success  +-+ checked
+// +-----------+     +--------------------------+  /    +-----------+
+//                   |         success          +-°
+//                   +--------------------------+
+//                   |         failure          +-X either success or failure 
is hooked up
+//                   +--------------------------+
+//

Review comment:
       I'm against ASCII art in the code. GetFile -> ExecutePythonProcessor -> 
PutFile is enough detail in a comment IMO.
   
   
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#nl3-keep-comments-crisp

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd
+
+std::string GetCurrentWorkingDir(void) {
+  char buff[FILENAME_MAX];
+  GetCurrentDir(buff, FILENAME_MAX);
+  std::string current_working_dir(buff);
+  return current_working_dir;
+}
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string createTempDir() {
+      char dirtemplate[] = "/tmp/gt.XXXXXX";
+      std::string temp_dir = testController_->createTempDirectory(dirtemplate);
+      REQUIRE(!temp_dir.empty());
+      struct stat buffer;
+      REQUIRE(-1 != stat(temp_dir.c_str(), &buffer));
+      REQUIRE(S_ISDIR(buffer.st_mode));
+      return temp_dir;
+    }
+
+    std::string putFileToDir(const std::string& dir_path, const std::string& 
file_name, const std::string& content) {
+      std::string file_path(dir_path + utils::file::FileUtils::get_separator() 
+ file_name);
+      std::ofstream out_file(file_path);
+      if (out_file.is_open()) {
+        out_file << content;
+        out_file.close();
+      }
+      return file_path;
+    }
+
+    std::string createTempDirWithFile(const std::string& file_name, const 
std::string& content) {
+      std::string temp_dir = createTempDir();
+      putFileToDir(temp_dir, file_name, content);
+      return temp_dir;
+    }
+
+    std::string getFileContent(const std::string& file_name) {
+      std::ifstream file_handle(file_name);
+      REQUIRE(file_handle.is_open());
+      const std::string file_content{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+      file_handle.close();
+      return file_content;
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    const std::string TEST_FILE_NAME{ "test_file.txt" };
+    const std::string TEST_FILE_CONTENT{ "Test text\n" };
+    const std::string SCRIPT_FILES_DIRECTORY{ "test_scripts" };
+
+    std::unique_ptr<TestController> testController_;
+    std::shared_ptr<TestPlan> plan_;
+    LogTestController& logTestController_;
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase 
{
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const 
core::Relationship& execute_python_out_conn, const std::string& 
used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(TEST_FILE_NAME, 
TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir();
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir();
+
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor -> initialize();
+
+    addPutFileProcessorToPlan(core::Relationship("success", "description"), 
output_dir);
+    plan_->runNextProcessor();  // ExecutePythonProcessor
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // ExecutePythonProcessor
+    }
+    plan_->runNextProcessor();  // PutFile
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // PutFile
+      const std::string state_name = std::to_string(i);
+      const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() + state_name;
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(output_file_content == state_name);
+    }
+  }
+
+ private:
+  std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const 
std::string& dir_path) {
+    std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", 
"getfileCreate2");
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path);
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), 
"true");
+    return getfile;
+  }
+
+  std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const 
std::string& used_as_script_file, const std::string& used_as_script_body) {
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", 
core::Relationship("success", "description"), true);
+    if ("" != used_as_script_file) {
+      plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath(used_as_script_file));
+    }
+    if ("" != used_as_script_body) {
+        plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(),
 getFileContent(getScriptFullPath(used_as_script_body)));
+    }
+    executePythonProcessor -> initialize();
+    return executePythonProcessor;
+  }
+
+  std::shared_ptr<core::Processor> addPutFileProcessorToPlan(const 
core::Relationship& execute_python_outbound_connection, const std::string& 
dir_path) {
+    std::shared_ptr<core::Processor> putfile = plan_->addProcessor("PutFile", 
"putfile", execute_python_outbound_connection, true);
+    plan_->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir_path);
+    return putfile;
+  }
+};
+
+// Test for python processors for simple passthrough cases
+//
+// +-----------+     +--------------------------+       +-----------+
+// |  Getfile  |   .-+  ExecutePythonProcessor  |     .-+  PutFile  |
+// +-----------+  /  +  -  -  -  -  -  -  -  -  +    /  +-----------+
+// |  success  +-°   |    Attribute: Script     |   /   |  success  +-+ checked
+// +-----------+     +--------------------------+  /    +-----------+
+//                   |         success          +-°
+//                   +--------------------------+
+//                   |         failure          +-X either success or failure 
is hooked up
+//                   +--------------------------+
+//
+// testSimpleFilePassthrough(OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", 
"passthrough_processor_transfering_to_success.py");
+//
+// translates to
+//
+// GIVEN an ExecutePythonProcessor set up with a "Script Body" attribute that 
transfers to REL_SUCCESS, but not "Script File"
+// WHEN the processor is triggered
+// THEN any consumer using the success relationship as source should receive 
the transfered data
+//
+TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", 
"[executePythonProcessorSimple]") {
+  // Expectations
+  const auto OUTPUT_FILE_MATCHES_INPUT          = 
SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT;
+  const auto RUNTIME_RELATIONSHIP_EXCEPTION    = 
SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION;
+  const auto PROCESSOR_INITIALIZATION_EXCEPTION = 
SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION;
+  // ExecutePython outbound relationships
+  const core::Relationship SUCCESS {"success", "description"};
+  const core::Relationship FAILURE{"failure", "description"};
+
+  // For the tests "" is threated as none-provided since no optional 
implementation was ported to the project yet
+
+  ////////////////////////////////////////////////////////////
+  // 0. Neither valid script file nor script body provided  //
+  ////////////////////////////////////////////////////////////
+
+  //                                          TEST EXPECTATION  OUT_REL        
USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS,       
                "", "");  // NOLINT
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE,       
                "", "");  // NOLINT
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, 
"non_existent_script.py", "");  // NOLINT
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, 
"non_existent_script.py", "");  // NOLINT
+
+  ///////////////////////////////////////
+  // 1. Using script file as attribute //
+  ///////////////////////////////////////
+
+  //                                      TEST EXPECTATION  OUT_REL            
                     USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, SUCCESS, 
"passthrough_processor_transfering_to_success.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, 
"passthrough_processor_transfering_to_success.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, 
"passthrough_processor_transfering_to_failure.py", "");  // NOLINT
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, FAILURE, 
"passthrough_processor_transfering_to_failure.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE,           
        "non_transferring_processor.py", "");  // NOLINT
+
+  ///////////////////////////////////////
+  // 2. Using script body as attribute //
+  ///////////////////////////////////////
+
+  //                                      TEST EXPECTATION  OUT_REL  
SCRIPT_FILE                        USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", 
"passthrough_processor_transfering_to_success.py");  // NOLINT 
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", 
"passthrough_processor_transfering_to_success.py");  // NOLINT 
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", 
"passthrough_processor_transfering_to_failure.py");  // NOLINT 
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", 
"passthrough_processor_transfering_to_failure.py");  // NOLINT 
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "",       
            "non_transferring_processor.py");  // NOLINT
+
+  ////////////////////////////////
+  // 3. Setting both attributes //
+  ////////////////////////////////
+

Review comment:
       The all-/ and empty lines are redundant IMO.
   
   
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#nl3-keep-comments-crisp

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();

Review comment:
       Reload in onSchedule should stay even in the future IMO, as users should 
have a way to apply their script changes. For this reason, I recommend removing 
the TODO and issue comments from here. The issue refers to avoiding reload in 
onTrigger in the future, which would improve performance.
   
   idea, 100% optional:
   As an optimization we may want to check the last modified timestamp of the 
inode before reading the file.

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
 
-    triggerSchedule(engine, context);
+    engine->eval(script_to_exec_);
+    engine->onSchedule(context);
 
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-  } catch (...) {
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
   }
 }
 
 void ExecutePythonProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+  if (!valid_init_) {
+    throw std::runtime_error("Could not correctly initialize " + getName());
+  }
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
 
-    triggerEngineProcessor(engine, context, session);
-
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+    engine->onTrigger(context, session);
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception &exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
     this->yield();
-  } catch (...) {
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
     this->yield();
   }
 }
 
+// TODO(hunyadi): This is potentially not what we want. See 
https://issues.apache.org/jira/browse/MINIFICPP-1222
+std::shared_ptr<python::PythonScriptEngine> 
ExecutePythonProcessor::getScriptEngine() {
+  std::shared_ptr<python::PythonScriptEngine> engine;
+  // Use an existing engine, if one is available
+  if (script_engine_q_.try_dequeue(engine)) {
+    logger_->log_debug("Using available [%p] script engine instance", 
engine.get());
+    return engine;
+  }
+  engine = createEngine<python::PythonScriptEngine>();
+  logger_->log_info("Created new [%p] script instance", engine.get());
+  logger_->log_info("Approximately %d script instances created for this 
processor", script_engine_q_.size_approx());
+  if (engine == nullptr) {
+    throw std::runtime_error("No script engine available");
+  }
+  return engine;
+}
+
+void 
ExecutePythonProcessor::handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&&
 engine) {
+  // Make engine available for use again
+  if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
+    logger_->log_debug("Releasing [%p] script engine", engine.get());
+    script_engine_q_.enqueue(engine);
+  } else {
+    logger_->log_info("Destroying script engine because it is no longer 
needed");
+  }
+}
+
+void ExecutePythonProcessor::appendPathForImportModules() {
+  // TODO(hunyadi): I have spent some time trying to figure out pybind11, but
+  // could not get this working yet. It is up to be implemented later
+  // https://issues.apache.org/jira/browse/MINIFICPP-1224
+  if ("" != module_directory_) {
+    logger_ -> log_error("Not supported property: Module Directory.");

Review comment:
       I think spaces around the `->` operator look really strange and they are 
inconsistent with the file (or the codebase).

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd
+
+std::string GetCurrentWorkingDir(void) {
+  char buff[FILENAME_MAX];
+  GetCurrentDir(buff, FILENAME_MAX);
+  std::string current_working_dir(buff);
+  return current_working_dir;
+}
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string createTempDir() {
+      char dirtemplate[] = "/tmp/gt.XXXXXX";
+      std::string temp_dir = testController_->createTempDirectory(dirtemplate);
+      REQUIRE(!temp_dir.empty());
+      struct stat buffer;
+      REQUIRE(-1 != stat(temp_dir.c_str(), &buffer));
+      REQUIRE(S_ISDIR(buffer.st_mode));
+      return temp_dir;
+    }
+
+    std::string putFileToDir(const std::string& dir_path, const std::string& 
file_name, const std::string& content) {
+      std::string file_path(dir_path + utils::file::FileUtils::get_separator() 
+ file_name);
+      std::ofstream out_file(file_path);
+      if (out_file.is_open()) {
+        out_file << content;
+        out_file.close();
+      }
+      return file_path;
+    }
+
+    std::string createTempDirWithFile(const std::string& file_name, const 
std::string& content) {
+      std::string temp_dir = createTempDir();
+      putFileToDir(temp_dir, file_name, content);
+      return temp_dir;
+    }
+
+    std::string getFileContent(const std::string& file_name) {
+      std::ifstream file_handle(file_name);
+      REQUIRE(file_handle.is_open());
+      const std::string file_content{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+      file_handle.close();
+      return file_content;
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    const std::string TEST_FILE_NAME{ "test_file.txt" };
+    const std::string TEST_FILE_CONTENT{ "Test text\n" };
+    const std::string SCRIPT_FILES_DIRECTORY{ "test_scripts" };

Review comment:
       I think we only need one object of these, not one every instance. They 
should go outside of the class, become `static`, or possibly both (with 
`static` referring to linkage in this case).

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd
+
+std::string GetCurrentWorkingDir(void) {
+  char buff[FILENAME_MAX];
+  GetCurrentDir(buff, FILENAME_MAX);
+  std::string current_working_dir(buff);
+  return current_working_dir;
+}
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string createTempDir() {
+      char dirtemplate[] = "/tmp/gt.XXXXXX";
+      std::string temp_dir = testController_->createTempDirectory(dirtemplate);
+      REQUIRE(!temp_dir.empty());
+      struct stat buffer;
+      REQUIRE(-1 != stat(temp_dir.c_str(), &buffer));
+      REQUIRE(S_ISDIR(buffer.st_mode));
+      return temp_dir;
+    }
+
+    std::string putFileToDir(const std::string& dir_path, const std::string& 
file_name, const std::string& content) {
+      std::string file_path(dir_path + utils::file::FileUtils::get_separator() 
+ file_name);
+      std::ofstream out_file(file_path);
+      if (out_file.is_open()) {
+        out_file << content;
+        out_file.close();
+      }
+      return file_path;
+    }
+
+    std::string createTempDirWithFile(const std::string& file_name, const 
std::string& content) {
+      std::string temp_dir = createTempDir();
+      putFileToDir(temp_dir, file_name, content);
+      return temp_dir;
+    }
+
+    std::string getFileContent(const std::string& file_name) {
+      std::ifstream file_handle(file_name);
+      REQUIRE(file_handle.is_open());
+      const std::string file_content{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+      file_handle.close();
+      return file_content;
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    const std::string TEST_FILE_NAME{ "test_file.txt" };
+    const std::string TEST_FILE_CONTENT{ "Test text\n" };
+    const std::string SCRIPT_FILES_DIRECTORY{ "test_scripts" };
+
+    std::unique_ptr<TestController> testController_;
+    std::shared_ptr<TestPlan> plan_;
+    LogTestController& logTestController_;
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase 
{
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const 
core::Relationship& execute_python_out_conn, const std::string& 
used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(TEST_FILE_NAME, 
TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir();
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir();
+
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor -> initialize();
+
+    addPutFileProcessorToPlan(core::Relationship("success", "description"), 
output_dir);
+    plan_->runNextProcessor();  // ExecutePythonProcessor
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // ExecutePythonProcessor
+    }
+    plan_->runNextProcessor();  // PutFile
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // PutFile
+      const std::string state_name = std::to_string(i);
+      const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() + state_name;
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(output_file_content == state_name);
+    }
+  }
+
+ private:
+  std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const 
std::string& dir_path) {
+    std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", 
"getfileCreate2");
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path);
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), 
"true");
+    return getfile;
+  }
+
+  std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const 
std::string& used_as_script_file, const std::string& used_as_script_body) {
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", 
core::Relationship("success", "description"), true);
+    if ("" != used_as_script_file) {
+      plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath(used_as_script_file));
+    }
+    if ("" != used_as_script_body) {
+        plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(),
 getFileContent(getScriptFullPath(used_as_script_body)));
+    }
+    executePythonProcessor -> initialize();
+    return executePythonProcessor;
+  }
+
+  std::shared_ptr<core::Processor> addPutFileProcessorToPlan(const 
core::Relationship& execute_python_outbound_connection, const std::string& 
dir_path) {
+    std::shared_ptr<core::Processor> putfile = plan_->addProcessor("PutFile", 
"putfile", execute_python_outbound_connection, true);
+    plan_->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir_path);
+    return putfile;
+  }
+};
+
+// Test for python processors for simple passthrough cases
+//
+// +-----------+     +--------------------------+       +-----------+
+// |  Getfile  |   .-+  ExecutePythonProcessor  |     .-+  PutFile  |
+// +-----------+  /  +  -  -  -  -  -  -  -  -  +    /  +-----------+
+// |  success  +-°   |    Attribute: Script     |   /   |  success  +-+ checked
+// +-----------+     +--------------------------+  /    +-----------+
+//                   |         success          +-°
+//                   +--------------------------+
+//                   |         failure          +-X either success or failure 
is hooked up
+//                   +--------------------------+
+//
+// testSimpleFilePassthrough(OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", 
"passthrough_processor_transfering_to_success.py");
+//
+// translates to
+//
+// GIVEN an ExecutePythonProcessor set up with a "Script Body" attribute that 
transfers to REL_SUCCESS, but not "Script File"
+// WHEN the processor is triggered
+// THEN any consumer using the success relationship as source should receive 
the transfered data
+//
+TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", 
"[executePythonProcessorSimple]") {
+  // Expectations
+  const auto OUTPUT_FILE_MATCHES_INPUT          = 
SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT;
+  const auto RUNTIME_RELATIONSHIP_EXCEPTION    = 
SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION;
+  const auto PROCESSOR_INITIALIZATION_EXCEPTION = 
SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION;
+  // ExecutePython outbound relationships
+  const core::Relationship SUCCESS {"success", "description"};
+  const core::Relationship FAILURE{"failure", "description"};
+
+  // For the tests "" is threated as none-provided since no optional 
implementation was ported to the project yet
+
+  ////////////////////////////////////////////////////////////
+  // 0. Neither valid script file nor script body provided  //
+  ////////////////////////////////////////////////////////////
+
+  //                                          TEST EXPECTATION  OUT_REL        
USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS,       
                "", "");  // NOLINT

Review comment:
       Comment blocks like this are a bad sign IMO. I like the content in this 
case, just not the formatting, because it distracts attention from the code to 
the comment.
   ```suggestion
     // Neither valid script file nor script body provided
     //                                          TEST EXPECTATION  OUT_REL      
  USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
     testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS,     
                  "", "");  // NOLINT
   ```

##########
File path: libminifi/test/script-tests/CMakeLists.txt
##########
@@ -19,6 +19,13 @@
 
 if (NOT DISABLE_PYTHON_SCRIPTING)
        file(GLOB EXECUTESCRIPT_PYTHON_INTEGRATION_TESTS  "Python*.cpp")
+       file(GLOB EXECUTEPYTHONPROCESSOR_UNIT_TESTS  
"ExecutePythonProcessorTests.cpp")
+       file(GLOB PY_SOURCES  "python/*.cpp")
+       find_package(PythonLibs 3.5)
+       if (NOT PYTHONLIBS_FOUND)
+               find_package(PythonLibs 3.0 REQUIRED)
+       endif()

Review comment:
       I think we require python 3.5 on all platforms, so a fallback to 3.0 
doesn't seem to provide any value as far as I understand. The version argument 
of `find_package` takes a minimum required version AFAIK, so the above code is 
equivalent to just `find_package(PythonLibs 3.0 REQUIRED)`.
   
   If my assumptions are correct, we should pull python 3.5.
   ```suggestion
        find_package(PythonLibs 3.5 REQUIRED)
   ```

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd

Review comment:
       Why do we need this macro? Why not just call `getcwd()`?

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {

Review comment:
       I think `!script_to_exec_.empty()` (or `not script_to_exec_.empty()`) 
would be more readable.

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
 
-    triggerSchedule(engine, context);
+    engine->eval(script_to_exec_);
+    engine->onSchedule(context);
 
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-  } catch (...) {
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
   }
 }
 
 void ExecutePythonProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+  if (!valid_init_) {
+    throw std::runtime_error("Could not correctly initialize " + getName());
+  }
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
 
-    triggerEngineProcessor(engine, context, session);
-
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+    engine->onTrigger(context, session);
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception &exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
     this->yield();
-  } catch (...) {
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
     this->yield();
   }
 }
 
+// TODO(hunyadi): This is potentially not what we want. See 
https://issues.apache.org/jira/browse/MINIFICPP-1222
+std::shared_ptr<python::PythonScriptEngine> 
ExecutePythonProcessor::getScriptEngine() {
+  std::shared_ptr<python::PythonScriptEngine> engine;
+  // Use an existing engine, if one is available
+  if (script_engine_q_.try_dequeue(engine)) {
+    logger_->log_debug("Using available [%p] script engine instance", 
engine.get());
+    return engine;
+  }
+  engine = createEngine<python::PythonScriptEngine>();
+  logger_->log_info("Created new [%p] script instance", engine.get());
+  logger_->log_info("Approximately %d script instances created for this 
processor", script_engine_q_.size_approx());
+  if (engine == nullptr) {
+    throw std::runtime_error("No script engine available");
+  }
+  return engine;
+}
+
+void 
ExecutePythonProcessor::handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&&
 engine) {
+  // Make engine available for use again
+  if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
+    logger_->log_debug("Releasing [%p] script engine", engine.get());
+    script_engine_q_.enqueue(engine);
+  } else {
+    logger_->log_info("Destroying script engine because it is no longer 
needed");
+  }
+}
+
+void ExecutePythonProcessor::appendPathForImportModules() {
+  // TODO(hunyadi): I have spent some time trying to figure out pybind11, but
+  // could not get this working yet. It is up to be implemented later
+  // https://issues.apache.org/jira/browse/MINIFICPP-1224
+  if ("" != module_directory_) {
+    logger_ -> log_error("Not supported property: Module Directory.");
+  }
+
+}
+
+void ExecutePythonProcessor::loadScriptFromFile(const std::string& file_path) {
+  std::ifstream file_handle(file_path);
+  if (!file_handle.is_open()) {
+    script_to_exec_ = "";
+    throw std::runtime_error("Failed to read Script File: " + file_path);
+  }
+  script_to_exec_ = std::string{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+  file_handle.close();

Review comment:
       In the `script_to_exec_` assignment line, spacing inside `{}`s are 
inconsistent. Either use spaces on both sides or neither, IMO.
   
   `close()` at the end of the block is redundant: the dtor will close the file 
anyway.

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd
+
+std::string GetCurrentWorkingDir(void) {
+  char buff[FILENAME_MAX];
+  GetCurrentDir(buff, FILENAME_MAX);
+  std::string current_working_dir(buff);
+  return current_working_dir;
+}
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string createTempDir() {
+      char dirtemplate[] = "/tmp/gt.XXXXXX";
+      std::string temp_dir = testController_->createTempDirectory(dirtemplate);
+      REQUIRE(!temp_dir.empty());
+      struct stat buffer;
+      REQUIRE(-1 != stat(temp_dir.c_str(), &buffer));
+      REQUIRE(S_ISDIR(buffer.st_mode));
+      return temp_dir;
+    }
+
+    std::string putFileToDir(const std::string& dir_path, const std::string& 
file_name, const std::string& content) {
+      std::string file_path(dir_path + utils::file::FileUtils::get_separator() 
+ file_name);
+      std::ofstream out_file(file_path);
+      if (out_file.is_open()) {
+        out_file << content;
+        out_file.close();
+      }
+      return file_path;
+    }
+
+    std::string createTempDirWithFile(const std::string& file_name, const 
std::string& content) {
+      std::string temp_dir = createTempDir();
+      putFileToDir(temp_dir, file_name, content);
+      return temp_dir;
+    }
+
+    std::string getFileContent(const std::string& file_name) {
+      std::ifstream file_handle(file_name);
+      REQUIRE(file_handle.is_open());
+      const std::string file_content{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+      file_handle.close();
+      return file_content;
+    }

Review comment:
       These are generic enough that they could go to `FileUtils` IMO.
   
   I'd remove the `std::ifstream::close` calls because they are both right 
before `return`, after which the file is closed anyway. They don't hurt but 
they're not needed either.
   
   If we handle errors in some way, we shouldn't ignore them IMO. I mean if we 
check for a good file stream, we should throw if it's not good.

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,276 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace {
+
+#include <unistd.h>
+#define GetCurrentDir getcwd
+
+std::string GetCurrentWorkingDir(void) {
+  char buff[FILENAME_MAX];
+  GetCurrentDir(buff, FILENAME_MAX);
+  std::string current_working_dir(buff);
+  return current_working_dir;
+}
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string createTempDir() {
+      char dirtemplate[] = "/tmp/gt.XXXXXX";
+      std::string temp_dir = testController_->createTempDirectory(dirtemplate);
+      REQUIRE(!temp_dir.empty());
+      struct stat buffer;
+      REQUIRE(-1 != stat(temp_dir.c_str(), &buffer));
+      REQUIRE(S_ISDIR(buffer.st_mode));
+      return temp_dir;
+    }
+
+    std::string putFileToDir(const std::string& dir_path, const std::string& 
file_name, const std::string& content) {
+      std::string file_path(dir_path + utils::file::FileUtils::get_separator() 
+ file_name);
+      std::ofstream out_file(file_path);
+      if (out_file.is_open()) {
+        out_file << content;
+        out_file.close();
+      }
+      return file_path;
+    }
+
+    std::string createTempDirWithFile(const std::string& file_name, const 
std::string& content) {
+      std::string temp_dir = createTempDir();
+      putFileToDir(temp_dir, file_name, content);
+      return temp_dir;
+    }
+
+    std::string getFileContent(const std::string& file_name) {
+      std::ifstream file_handle(file_name);
+      REQUIRE(file_handle.is_open());
+      const std::string file_content{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+      file_handle.close();
+      return file_content;
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    const std::string TEST_FILE_NAME{ "test_file.txt" };
+    const std::string TEST_FILE_CONTENT{ "Test text\n" };
+    const std::string SCRIPT_FILES_DIRECTORY{ "test_scripts" };
+
+    std::unique_ptr<TestController> testController_;
+    std::shared_ptr<TestPlan> plan_;
+    LogTestController& logTestController_;
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase 
{
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const 
core::Relationship& execute_python_out_conn, const std::string& 
used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(TEST_FILE_NAME, 
TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir();
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir();
+
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor -> initialize();
+
+    addPutFileProcessorToPlan(core::Relationship("success", "description"), 
output_dir);
+    plan_->runNextProcessor();  // ExecutePythonProcessor
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // ExecutePythonProcessor
+    }
+    plan_->runNextProcessor();  // PutFile
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // PutFile
+      const std::string state_name = std::to_string(i);
+      const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() + state_name;
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(output_file_content == state_name);
+    }
+  }
+
+ private:
+  std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const 
std::string& dir_path) {
+    std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", 
"getfileCreate2");
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path);
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), 
"true");
+    return getfile;
+  }
+
+  std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const 
std::string& used_as_script_file, const std::string& used_as_script_body) {
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", 
core::Relationship("success", "description"), true);
+    if ("" != used_as_script_file) {
+      plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath(used_as_script_file));
+    }
+    if ("" != used_as_script_body) {
+        plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(),
 getFileContent(getScriptFullPath(used_as_script_body)));
+    }
+    executePythonProcessor -> initialize();
+    return executePythonProcessor;
+  }
+
+  std::shared_ptr<core::Processor> addPutFileProcessorToPlan(const 
core::Relationship& execute_python_outbound_connection, const std::string& 
dir_path) {
+    std::shared_ptr<core::Processor> putfile = plan_->addProcessor("PutFile", 
"putfile", execute_python_outbound_connection, true);
+    plan_->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir_path);
+    return putfile;
+  }
+};
+
+// Test for python processors for simple passthrough cases
+//
+// +-----------+     +--------------------------+       +-----------+
+// |  Getfile  |   .-+  ExecutePythonProcessor  |     .-+  PutFile  |
+// +-----------+  /  +  -  -  -  -  -  -  -  -  +    /  +-----------+
+// |  success  +-°   |    Attribute: Script     |   /   |  success  +-+ checked
+// +-----------+     +--------------------------+  /    +-----------+
+//                   |         success          +-°
+//                   +--------------------------+
+//                   |         failure          +-X either success or failure 
is hooked up
+//                   +--------------------------+
+//
+// testSimpleFilePassthrough(OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", 
"passthrough_processor_transfering_to_success.py");
+//
+// translates to
+//
+// GIVEN an ExecutePythonProcessor set up with a "Script Body" attribute that 
transfers to REL_SUCCESS, but not "Script File"
+// WHEN the processor is triggered
+// THEN any consumer using the success relationship as source should receive 
the transfered data
+//
+TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", 
"[executePythonProcessorSimple]") {
+  // Expectations
+  const auto OUTPUT_FILE_MATCHES_INPUT          = 
SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT;
+  const auto RUNTIME_RELATIONSHIP_EXCEPTION    = 
SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION;
+  const auto PROCESSOR_INITIALIZATION_EXCEPTION = 
SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION;
+  // ExecutePython outbound relationships
+  const core::Relationship SUCCESS {"success", "description"};
+  const core::Relationship FAILURE{"failure", "description"};
+
+  // For the tests "" is threated as none-provided since no optional 
implementation was ported to the project yet

Review comment:
       typo: s/threated/treated/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to