szaszm commented on a change in pull request #784:
URL: https://github.com/apache/nifi-minifi-cpp/pull/784#discussion_r435549817



##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -35,155 +35,188 @@ namespace python {
 namespace processors {
 
 core::Property ExecutePythonProcessor::ScriptFile("Script File",  // NOLINT
-    R"(Path to script file to execute)", "");
+    R"(Path to script file to execute. Only one of Script File or Script Body 
may be used)", "");
+core::Property ExecutePythonProcessor::ScriptBody("Script Body",  // NOLINT
+    R"(Script to execute. Only one of Script File or Script Body may be 
used)", "");
 core::Property ExecutePythonProcessor::ModuleDirectory("Module Directory",  // 
NOLINT
-    R"(Comma-separated list of paths to files and/or directories which
-                                                 contain modules required by 
the script)", "");
+    R"(Comma-separated list of paths to files and/or directories which contain 
modules required by the script)", "");
 
 core::Relationship ExecutePythonProcessor::Success("success", "Script 
successes");  // NOLINT
 core::Relationship ExecutePythonProcessor::Failure("failure", "Script 
failures");  // NOLINT
 
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if (script_to_exec_.size()) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
 
-    triggerSchedule(engine, context);
+    engine->eval(script_to_exec_);
+    engine->onSchedule(context);
 
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-  } catch (...) {
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
   }
 }
 
 void ExecutePythonProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+  if (!valid_init_) {
+    throw std::runtime_error("Could not correctly initialize " + getName());
+  }
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0

Review comment:
       `s/on schedule/triggered/`

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();

Review comment:
       The comment should still be removed from here (but left in place in 
`onTrigger`). Rereading in `onSchedule` is not a problem at all and we 
shouldn't ever change the behavior in `onSchedule`.
   
   For clarification: `onSchedule` is called when we need to (re)configure the 
processor (rarely). `onTrigger` is called on processor execution, i.e. often.

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,220 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::createTempDir;
+using org::apache::nifi::minifi::utils::putFileToDir;
+using org::apache::nifi::minifi::utils::createTempDirWithFile;
+using org::apache::nifi::minifi::utils::getFileContent;
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    static const std::string TEST_FILE_NAME;
+    static const std::string TEST_FILE_CONTENT;
+    static const std::string SCRIPT_FILES_DIRECTORY;
+
+    std::unique_ptr<TestController> testController_;
+    std::shared_ptr<TestPlan> plan_;
+    LogTestController& logTestController_;
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+const std::string ExecutePythonProcessorTestBase::TEST_FILE_NAME{ 
"test_file.txt" };
+const std::string ExecutePythonProcessorTestBase::TEST_FILE_CONTENT{ "Test 
text\n" };
+const std::string ExecutePythonProcessorTestBase::SCRIPT_FILES_DIRECTORY{ 
"test_scripts" };
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase 
{
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const 
core::Relationship& execute_python_out_conn, const std::string& 
used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(testController_.get(), 
TEST_FILE_NAME, TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir(testController_.get());
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir(testController_.get());
+
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor -> initialize();

Review comment:
       There should be no spaces around `->`

##########
File path: libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
##########
@@ -0,0 +1,220 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "python/ExecutePythonProcessor.h"
+#include "processors/LogAttribute.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::createTempDir;
+using org::apache::nifi::minifi::utils::putFileToDir;
+using org::apache::nifi::minifi::utils::createTempDirWithFile;
+using org::apache::nifi::minifi::utils::getFileContent;
+
+class ExecutePythonProcessorTestBase {
+ public:
+    ExecutePythonProcessorTestBase() :
+      logTestController_(LogTestController::getInstance()),
+      
logger_(logging::LoggerFactory<org::apache::nifi::minifi::python::processors::ExecutePythonProcessor>::getLogger())
 {
+      reInitialize();
+    }
+    virtual ~ExecutePythonProcessorTestBase() {
+      logTestController_.reset();
+      logTestController_.setDebug<TestPlan>();
+      
logTestController_.setDebug<minifi::python::processors::ExecutePythonProcessor>();
+      logTestController_.setDebug<minifi::processors::PutFile>();
+      logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
+    }
+
+ protected:
+    void reInitialize() {
+      testController_.reset(new TestController());
+      plan_ = testController_->createPlan();
+    }
+
+    std::string getScriptFullPath(const std::string& script_file_name) {
+      return SCRIPT_FILES_DIRECTORY + utils::file::FileUtils::get_separator() 
+ script_file_name;
+    }
+
+    static const std::string TEST_FILE_NAME;
+    static const std::string TEST_FILE_CONTENT;
+    static const std::string SCRIPT_FILES_DIRECTORY;
+
+    std::unique_ptr<TestController> testController_;
+    std::shared_ptr<TestPlan> plan_;
+    LogTestController& logTestController_;
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+const std::string ExecutePythonProcessorTestBase::TEST_FILE_NAME{ 
"test_file.txt" };
+const std::string ExecutePythonProcessorTestBase::TEST_FILE_CONTENT{ "Test 
text\n" };
+const std::string ExecutePythonProcessorTestBase::SCRIPT_FILES_DIRECTORY{ 
"test_scripts" };
+
+class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase 
{
+ public:
+  enum class Expectation {
+    OUTPUT_FILE_MATCHES_INPUT,
+    RUNTIME_RELATIONSHIP_EXCEPTION,
+    PROCESSOR_INITIALIZATION_EXCEPTION
+  };
+  SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
+
+ protected:
+  void testSimpleFilePassthrough(const Expectation expectation, const 
core::Relationship& execute_python_out_conn, const std::string& 
used_as_script_file, const std::string& used_as_script_body) {
+    reInitialize();
+    const std::string input_dir = createTempDirWithFile(testController_.get(), 
TEST_FILE_NAME, TEST_FILE_CONTENT);
+    const std::string output_dir = createTempDir(testController_.get());
+
+    addGetFileProcessorToPlan(input_dir);
+    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
+      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+      return;
+    }
+    REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, 
used_as_script_body));
+    addPutFileProcessorToPlan(execute_python_out_conn, output_dir);
+
+    plan_->runNextProcessor();  // GetFile
+    if (Expectation::RUNTIME_RELATIONSHIP_EXCEPTION == expectation) {
+      REQUIRE_THROWS(plan_->runNextProcessor());  // ExecutePythonProcessor
+      return;
+    }
+    REQUIRE_NOTHROW(plan_->runNextProcessor());  // ExecutePythonProcessor
+    plan_->runNextProcessor();  // PutFile
+
+    const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() +  TEST_FILE_NAME;
+
+    if (Expectation::OUTPUT_FILE_MATCHES_INPUT == expectation) {
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(TEST_FILE_CONTENT == output_file_content);
+    }
+  }
+  void testsStatefulProcessor() {
+    reInitialize();
+    const std::string output_dir = createTempDir(testController_.get());
+
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
+    plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath("stateful_processor.py"));
+    executePythonProcessor -> initialize();
+
+    addPutFileProcessorToPlan(core::Relationship("success", "description"), 
output_dir);
+    plan_->runNextProcessor();  // ExecutePythonProcessor
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // ExecutePythonProcessor
+    }
+    plan_->runNextProcessor();  // PutFile
+    for (std::size_t i = 0; i < 10; ++i) {
+      plan_->runCurrentProcessor();  // PutFile
+      const std::string state_name = std::to_string(i);
+      const std::string output_file_path = output_dir + 
utils::file::FileUtils::get_separator() + state_name;
+      const std::string output_file_content{ getFileContent(output_file_path) 
};
+      REQUIRE(output_file_content == state_name);
+    }
+  }
+
+ private:
+  std::shared_ptr<core::Processor> addGetFileProcessorToPlan(const 
std::string& dir_path) {
+    std::shared_ptr<core::Processor> getfile = plan_->addProcessor("GetFile", 
"getfileCreate2");
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir_path);
+    plan_->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), 
"true");
+    return getfile;
+  }
+
+  std::shared_ptr<core::Processor> addExecutePythonProcessorToPlan(const 
std::string& used_as_script_file, const std::string& used_as_script_body) {
+    auto executePythonProcessor = 
plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor", 
core::Relationship("success", "description"), true);
+    if ("" != used_as_script_file) {
+      plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(),
 getScriptFullPath(used_as_script_file));
+    }
+    if ("" != used_as_script_body) {
+        plan_->setProperty(executePythonProcessor, 
org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(),
 getFileContent(getScriptFullPath(used_as_script_body)));
+    }
+    executePythonProcessor -> initialize();

Review comment:
       There should be no spaces around `->`

##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -46,144 +50,177 @@ core::Relationship 
ExecutePythonProcessor::Failure("failure", "Script failures")
 void ExecutePythonProcessor::initialize() {
   // initialization requires that we do a little leg work prior to onSchedule
   // so that we can provide manifest our processor identity
-  std::set<core::Property> properties;
-
-  std::string prop;
-  getProperty(ScriptFile.getName(), prop);
-
-  properties.insert(ScriptFile);
-  properties.insert(ModuleDirectory);
-  setSupportedProperties(properties);
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-  setAcceptAllProperties();
-  if (!prop.empty()) {
-    setProperty(ScriptFile, prop);
-    std::shared_ptr<script::ScriptEngine> engine;
-    python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  if (getProperties().empty()) {
+    setSupportedProperties({
+      ScriptFile,
+      ScriptBody,
+      ModuleDirectory
+    });
+    setAcceptAllProperties();
+    setSupportedRelationships({
+      Success,
+      Failure
+    });
+    valid_init_ = false;
+    return;
+  }
 
-    engine = createEngine<python::PythonScriptEngine>();
+  python_logger_ = 
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
-    if (engine == nullptr) {
-      throw std::runtime_error("No script engine available");
-    }
+  getProperty(ModuleDirectory.getName(), module_directory_);
 
-    try {
-      engine->evalFile(prop);
-      auto me = shared_from_this();
-      triggerDescribe(engine, me);
-      triggerInitialize(engine, me);
+  valid_init_ = false;
+  appendPathForImportModules();
+  loadScript();
+  try {
+    if ("" != script_to_exec_) {
+      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+      engine->eval(script_to_exec_);
+      auto shared_this = shared_from_this();
+      engine->describe(shared_this);
+      engine->onInitialize(shared_this);
+      handleEngineNoLongerInUse(std::move(engine));
       valid_init_ = true;
-    } catch (std::exception &exception) {
-      logger_->log_error("Caught Exception %s", exception.what());
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
-    } catch (...) {
-      logger_->log_error("Caught Exception");
-      engine = nullptr;
-      std::rethrow_exception(std::current_exception());
-      valid_init_ = false;
     }
-
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+    std::rethrow_exception(std::current_exception());
+  }
+  catch (...) {
+    logger_->log_error("Caught Exception");
+    std::rethrow_exception(std::current_exception());
   }
 }
 
 void ExecutePythonProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (!valid_init_) {
-    throw std::runtime_error("Could not correctly in initialize " + getName());
-  }
-  context->getProperty(ScriptFile.getName(), script_file_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
-  if (script_file_.empty() && script_engine_.empty()) {
-    logger_->log_error("Script File must be defined");
-    return;
+    throw std::runtime_error("Could not correctly initialize " + getName());
   }
-
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
 
-    triggerSchedule(engine, context);
+    engine->eval(script_to_exec_);
+    engine->onSchedule(context);
 
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-  } catch (...) {
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception& exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
   }
 }
 
 void ExecutePythonProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+  if (!valid_init_) {
+    throw std::runtime_error("Could not correctly initialize " + getName());
+  }
   try {
-    std::shared_ptr<script::ScriptEngine> engine;
-
-    // Use an existing engine, if one is available
-    if (script_engine_q_.try_dequeue(engine)) {
-      logger_->log_debug("Using available %s script engine instance", 
script_engine_);
-    } else {
-      logger_->log_info("Creating new %s script instance", script_engine_);
-      logger_->log_info("Approximately %d %s script instances created for this 
processor", script_engine_q_.size_approx(), script_engine_);
-
-      engine = createEngine<python::PythonScriptEngine>();
-
-      if (engine == nullptr) {
-        throw std::runtime_error("No script engine available");
-      }
-
-      if (!script_file_.empty()) {
-        engine->evalFile(script_file_);
-      } else {
-        throw std::runtime_error("No Script File is available to execute");
-      }
+    // TODO(hunyadi): When using "Script File" property, we currently re-read 
the script file content every time the processor is on schedule. This should 
change to single-read when we release 1.0.0
+    // https://issues.apache.org/jira/browse/MINIFICPP-1223
+    reloadScriptIfUsingScriptFileProperty();
+    if (script_to_exec_.empty()) {
+      throw std::runtime_error("Neither Script Body nor Script File is 
available to execute");
     }
 
-    triggerEngineProcessor(engine, context, session);
-
-    // Make engine available for use again
-    if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing %s script engine", script_engine_);
-      script_engine_q_.enqueue(engine);
-    } else {
-      logger_->log_info("Destroying script engine because it is no longer 
needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
+    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+    engine->onTrigger(context, session);
+    handleEngineNoLongerInUse(std::move(engine));
+  }
+  catch (const std::exception &exception) {
+    logger_->log_error("Caught Exception: %s", exception.what());
     this->yield();
-  } catch (...) {
+  }
+  catch (...) {
     logger_->log_error("Caught Exception");
     this->yield();
   }
 }
 
+// TODO(hunyadi): This is potentially not what we want. See 
https://issues.apache.org/jira/browse/MINIFICPP-1222
+std::shared_ptr<python::PythonScriptEngine> 
ExecutePythonProcessor::getScriptEngine() {
+  std::shared_ptr<python::PythonScriptEngine> engine;
+  // Use an existing engine, if one is available
+  if (script_engine_q_.try_dequeue(engine)) {
+    logger_->log_debug("Using available [%p] script engine instance", 
engine.get());
+    return engine;
+  }
+  engine = createEngine<python::PythonScriptEngine>();
+  logger_->log_info("Created new [%p] script instance", engine.get());
+  logger_->log_info("Approximately %d script instances created for this 
processor", script_engine_q_.size_approx());
+  if (engine == nullptr) {
+    throw std::runtime_error("No script engine available");
+  }
+  return engine;
+}
+
+void 
ExecutePythonProcessor::handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&&
 engine) {
+  // Make engine available for use again
+  if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
+    logger_->log_debug("Releasing [%p] script engine", engine.get());
+    script_engine_q_.enqueue(engine);
+  } else {
+    logger_->log_info("Destroying script engine because it is no longer 
needed");
+  }
+}
+
+void ExecutePythonProcessor::appendPathForImportModules() {
+  // TODO(hunyadi): I have spent some time trying to figure out pybind11, but
+  // could not get this working yet. It is up to be implemented later
+  // https://issues.apache.org/jira/browse/MINIFICPP-1224
+  if ("" != module_directory_) {
+    logger_ -> log_error("Not supported property: Module Directory.");
+  }
+
+}
+
+void ExecutePythonProcessor::loadScriptFromFile(const std::string& file_path) {
+  std::ifstream file_handle(file_path);
+  if (!file_handle.is_open()) {
+    script_to_exec_ = "";
+    throw std::runtime_error("Failed to read Script File: " + file_path);
+  }
+  script_to_exec_ = std::string{ 
(std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>())};
+  file_handle.close();

Review comment:
       Not yet resolved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to