hunyadi-dev commented on a change in pull request #784:
URL: https://github.com/apache/nifi-minifi-cpp/pull/784#discussion_r435867932
##########
File path: extensions/script/python/ExecutePythonProcessor.cpp
##########
@@ -35,155 +35,185 @@ namespace python {
namespace processors {
core::Property ExecutePythonProcessor::ScriptFile("Script File", // NOLINT
- R"(Path to script file to execute)", "");
+ R"(Path to script file to execute. Only one of Script File or Script Body
may be used)", "");
+core::Property ExecutePythonProcessor::ScriptBody("Script Body", // NOLINT
+ R"(Script to execute. Only one of Script File or Script Body may be
used)", "");
core::Property ExecutePythonProcessor::ModuleDirectory("Module Directory", //
NOLINT
- R"(Comma-separated list of paths to files and/or directories which
- contain modules required by
the script)", "");
+ R"(Comma-separated list of paths to files and/or directories which contain
modules required by the script)", "");
core::Relationship ExecutePythonProcessor::Success("success", "Script
successes"); // NOLINT
core::Relationship ExecutePythonProcessor::Failure("failure", "Script
failures"); // NOLINT
void ExecutePythonProcessor::initialize() {
// initialization requires that we do a little leg work prior to onSchedule
// so that we can provide manifest our processor identity
- std::set<core::Property> properties;
-
- std::string prop;
- getProperty(ScriptFile.getName(), prop);
-
- properties.insert(ScriptFile);
- properties.insert(ModuleDirectory);
- setSupportedProperties(properties);
-
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- relationships.insert(Failure);
- setSupportedRelationships(std::move(relationships));
- setAcceptAllProperties();
- if (!prop.empty()) {
- setProperty(ScriptFile, prop);
- std::shared_ptr<script::ScriptEngine> engine;
- python_logger_ =
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+ if (getProperties().empty()) {
+ setSupportedProperties({
+ ScriptFile,
+ ScriptBody,
+ ModuleDirectory
+ });
+ setAcceptAllProperties();
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+ valid_init_ = false;
+ return;
+ }
- engine = createEngine<python::PythonScriptEngine>();
+ python_logger_ =
logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
- if (engine == nullptr) {
- throw std::runtime_error("No script engine available");
- }
+ getProperty(ModuleDirectory.getName(), module_directory_);
- try {
- engine->evalFile(prop);
- auto me = shared_from_this();
- triggerDescribe(engine, me);
- triggerInitialize(engine, me);
+ valid_init_ = false;
+ appendPathForImportModules();
+ loadScript();
+ try {
+ if (script_to_exec_.size()) {
+ std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+ engine->eval(script_to_exec_);
+ auto shared_this = shared_from_this();
+ engine->describe(shared_this);
+ engine->onInitialize(shared_this);
+ handleEngineNoLongerInUse(std::move(engine));
valid_init_ = true;
- } catch (std::exception &exception) {
- logger_->log_error("Caught Exception %s", exception.what());
- engine = nullptr;
- std::rethrow_exception(std::current_exception());
- valid_init_ = false;
- } catch (...) {
- logger_->log_error("Caught Exception");
- engine = nullptr;
- std::rethrow_exception(std::current_exception());
- valid_init_ = false;
}
-
+ }
+ catch (const std::exception& exception) {
+ logger_->log_error("Caught Exception: %s", exception.what());
+ std::rethrow_exception(std::current_exception());
+ }
+ catch (...) {
+ logger_->log_error("Caught Exception");
+ std::rethrow_exception(std::current_exception());
}
}
void ExecutePythonProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (!valid_init_) {
- throw std::runtime_error("Could not correctly in initialize " + getName());
- }
- context->getProperty(ScriptFile.getName(), script_file_);
- context->getProperty(ModuleDirectory.getName(), module_directory_);
- if (script_file_.empty() && script_engine_.empty()) {
- logger_->log_error("Script File must be defined");
- return;
+ throw std::runtime_error("Could not correctly initialize " + getName());
}
-
try {
- std::shared_ptr<script::ScriptEngine> engine;
-
- // Use an existing engine, if one is available
- if (script_engine_q_.try_dequeue(engine)) {
- logger_->log_debug("Using available %s script engine instance",
script_engine_);
- } else {
- logger_->log_info("Creating new %s script instance", script_engine_);
- logger_->log_info("Approximately %d %s script instances created for this
processor", script_engine_q_.size_approx(), script_engine_);
-
- engine = createEngine<python::PythonScriptEngine>();
-
- if (engine == nullptr) {
- throw std::runtime_error("No script engine available");
- }
-
- if (!script_file_.empty()) {
- engine->evalFile(script_file_);
- } else {
- throw std::runtime_error("No Script File is available to execute");
- }
+ reloadScriptIfUsingScriptFileProperty();
+ if (script_to_exec_.empty()) {
+ throw std::runtime_error("Neither Script Body nor Script File is
available to execute");
}
+ std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
- triggerSchedule(engine, context);
+ engine->eval(script_to_exec_);
+ engine->onSchedule(context);
- // Make engine available for use again
- if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
- logger_->log_debug("Releasing %s script engine", script_engine_);
- script_engine_q_.enqueue(engine);
- } else {
- logger_->log_info("Destroying script engine because it is no longer
needed");
- }
- } catch (std::exception &exception) {
- logger_->log_error("Caught Exception %s", exception.what());
- } catch (...) {
+ handleEngineNoLongerInUse(std::move(engine));
+ }
+ catch (const std::exception& exception) {
+ logger_->log_error("Caught Exception: %s", exception.what());
+ }
+ catch (...) {
logger_->log_error("Caught Exception");
}
}
void ExecutePythonProcessor::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
+ if (!valid_init_) {
+ throw std::runtime_error("Could not correctly initialize " + getName());
+ }
try {
- std::shared_ptr<script::ScriptEngine> engine;
-
- // Use an existing engine, if one is available
- if (script_engine_q_.try_dequeue(engine)) {
- logger_->log_debug("Using available %s script engine instance",
script_engine_);
- } else {
- logger_->log_info("Creating new %s script instance", script_engine_);
- logger_->log_info("Approximately %d %s script instances created for this
processor", script_engine_q_.size_approx(), script_engine_);
-
- engine = createEngine<python::PythonScriptEngine>();
-
- if (engine == nullptr) {
- throw std::runtime_error("No script engine available");
- }
-
- if (!script_file_.empty()) {
- engine->evalFile(script_file_);
- } else {
- throw std::runtime_error("No Script File is available to execute");
- }
+ // TODO(hunyadi): When using "Script File" property, we currently re-read
the script file content every time the processor is triggered. This should
change to single-read when we release 1.0.0
+ // https://issues.apache.org/jira/browse/MINIFICPP-1223
+ reloadScriptIfUsingScriptFileProperty();
+ if (script_to_exec_.empty()) {
+ throw std::runtime_error("Neither Script Body nor Script File is
available to execute");
}
- triggerEngineProcessor(engine, context, session);
-
- // Make engine available for use again
- if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
- logger_->log_debug("Releasing %s script engine", script_engine_);
- script_engine_q_.enqueue(engine);
- } else {
- logger_->log_info("Destroying script engine because it is no longer
needed");
- }
- } catch (std::exception &exception) {
- logger_->log_error("Caught Exception %s", exception.what());
+ std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+ engine->onTrigger(context, session);
+ handleEngineNoLongerInUse(std::move(engine));
+ }
+ catch (const std::exception &exception) {
+ logger_->log_error("Caught Exception: %s", exception.what());
this->yield();
- } catch (...) {
+ }
+ catch (...) {
logger_->log_error("Caught Exception");
this->yield();
}
}
+// TODO(hunyadi): This is potentially not what we want. See
https://issues.apache.org/jira/browse/MINIFICPP-1222
+std::shared_ptr<python::PythonScriptEngine>
ExecutePythonProcessor::getScriptEngine() {
+ std::shared_ptr<python::PythonScriptEngine> engine;
+ // Use an existing engine, if one is available
+ if (script_engine_q_.try_dequeue(engine)) {
+ logger_->log_debug("Using available [%p] script engine instance",
engine.get());
+ return engine;
+ }
+ engine = createEngine<python::PythonScriptEngine>();
+ logger_->log_info("Created new [%p] script engine instance. Number of
instances: approx. %d / %d.", engine.get(), script_engine_q_.size_approx(),
getMaxConcurrentTasks());
+ if (engine == nullptr) {
+ throw std::runtime_error("No script engine available");
+ }
+ return engine;
+}
+
+void
ExecutePythonProcessor::handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&&
engine) {
+ // Make engine available for use again
+ if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
+ logger_->log_debug("Releasing [%p] script engine", engine.get());
+ script_engine_q_.enqueue(engine);
+ } else {
+ logger_->log_info("Destroying script engine because it is no longer
needed");
+ }
+}
+
+void ExecutePythonProcessor::appendPathForImportModules() {
+ // TODO(hunyadi): I have spent some time trying to figure out pybind11, but
+ // could not get this working yet. It is up to be implemented later
+ // https://issues.apache.org/jira/browse/MINIFICPP-1224
+ if (module_directory_.size()) {
+ logger_->log_error("Not supported property: Module Directory.");
+ }
+
+}
+
+void ExecutePythonProcessor::loadScriptFromFile(const std::string& file_path) {
+ std::ifstream file_handle(file_path);
+ if (!file_handle.is_open()) {
+ script_to_exec_ = "";
+ throw std::runtime_error("Failed to read Script File: " + file_path);
+ }
+ script_to_exec_ = std::string{
(std::istreambuf_iterator<char>(file_handle)),
(std::istreambuf_iterator<char>())};
Review comment:
Added extra padding.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]