This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 004cede3e9040a08633c4900dbc11d2a12d53487 Author: Martin Zink <[email protected]> AuthorDate: Fri Apr 14 18:06:51 2023 +0200 MINIFICPP-2018 Add ProcessContext::getStateManager to Lua/Python Closes #1530 Signed-off-by: Marton Szasz <[email protected]> --- extensions/lua/LuaScriptEngine.cpp | 11 ++- extensions/lua/LuaScriptEngine.h | 8 -- extensions/lua/LuaScriptProcessContext.cpp | 8 +- extensions/lua/LuaScriptProcessContext.h | 6 +- extensions/lua/LuaScriptStateManager.cpp | 54 +++++++++++ ...iptProcessContext.h => LuaScriptStateManager.h} | 14 +-- .../TestExecuteScriptProcessorWithLuaScript.cpp | 28 ++++++ extensions/python/PythonBindings.cpp | 22 +++-- .../TestExecuteScriptProcessorWithPythonScript.cpp | 26 ++++++ extensions/python/types/PyProcessContext.cpp | 12 +++ extensions/python/types/PyProcessContext.h | 1 + extensions/python/types/PyStateManager.cpp | 103 +++++++++++++++++++++ .../types/{PyProcessContext.h => PyStateManager.h} | 19 ++-- extensions/python/types/Types.h | 7 ++ 14 files changed, 281 insertions(+), 38 deletions(-) diff --git a/extensions/lua/LuaScriptEngine.cpp b/extensions/lua/LuaScriptEngine.cpp index 083d0e48a..047c085fb 100644 --- a/extensions/lua/LuaScriptEngine.cpp +++ b/extensions/lua/LuaScriptEngine.cpp @@ -61,6 +61,13 @@ LuaScriptEngine::LuaScriptEngine() { lua_.new_usertype<LuaOutputStream>( "OutputStream", "write", &LuaOutputStream::write); + lua_.new_usertype<LuaScriptProcessContext>( + "ProcessContext", + "getStateManager", &LuaScriptProcessContext::getStateManager); + lua_.new_usertype<LuaScriptStateManager>( + "StateManager", + "set", &LuaScriptStateManager::set, + "get", &LuaScriptStateManager::get); } void LuaScriptEngine::executeScriptWithAppendedModulePaths(std::string& script) { @@ -128,8 +135,8 @@ class TriggerSession { } // namespace void LuaScriptEngine::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { - auto script_context = convert(context); - auto lua_session = convert(session); + auto script_context = std::make_shared<LuaScriptProcessContext>(context, lua_); + auto lua_session = std::make_shared<LuaProcessSession>(session); TriggerSession trigger_session(script_context, lua_session); call("onTrigger", script_context, lua_session); } diff --git a/extensions/lua/LuaScriptEngine.h b/extensions/lua/LuaScriptEngine.h index 0d2c7e059..8840448cd 100644 --- a/extensions/lua/LuaScriptEngine.h +++ b/extensions/lua/LuaScriptEngine.h @@ -72,14 +72,6 @@ class LuaScriptEngine { return value; } - static std::shared_ptr<LuaScriptProcessContext> convert(const std::shared_ptr<core::ProcessContext>& context) { - return std::make_shared<LuaScriptProcessContext>(context); - } - - static std::shared_ptr<LuaProcessSession> convert(const std::shared_ptr<core::ProcessSession>& session) { - return std::make_shared<LuaProcessSession>(session); - } - private: void executeScriptWithAppendedModulePaths(std::string& script); diff --git a/extensions/lua/LuaScriptProcessContext.cpp b/extensions/lua/LuaScriptProcessContext.cpp index 2f6522827..38a1775c3 100644 --- a/extensions/lua/LuaScriptProcessContext.cpp +++ b/extensions/lua/LuaScriptProcessContext.cpp @@ -24,8 +24,8 @@ namespace org::apache::nifi::minifi::extensions::lua { -LuaScriptProcessContext::LuaScriptProcessContext(std::shared_ptr<core::ProcessContext> context) - : context_(std::move(context)) { +LuaScriptProcessContext::LuaScriptProcessContext(std::shared_ptr<core::ProcessContext> context, sol::state& sol_state) + : context_(std::move(context)), sol_state_(sol_state) { } std::string LuaScriptProcessContext::getProperty(const std::string &name) { @@ -38,4 +38,8 @@ void LuaScriptProcessContext::releaseProcessContext() { context_.reset(); } +LuaScriptStateManager LuaScriptProcessContext::getStateManager() { + return LuaScriptStateManager(context_->getStateManager(), sol_state_); +} + } // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/LuaScriptProcessContext.h b/extensions/lua/LuaScriptProcessContext.h index 72dd161b9..c5eeb1974 100644 --- a/extensions/lua/LuaScriptProcessContext.h +++ b/extensions/lua/LuaScriptProcessContext.h @@ -22,18 +22,22 @@ #include <memory> #include "core/ProcessSession.h" +#include "LuaScriptStateManager.h" namespace org::apache::nifi::minifi::extensions::lua { class LuaScriptProcessContext { public: - explicit LuaScriptProcessContext(std::shared_ptr<core::ProcessContext> context); + explicit LuaScriptProcessContext(std::shared_ptr<core::ProcessContext> context, sol::state& sol_state); std::string getProperty(const std::string &name); void releaseProcessContext(); + LuaScriptStateManager getStateManager(); + private: std::shared_ptr<core::ProcessContext> context_; + sol::state& sol_state_; }; } // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/LuaScriptStateManager.cpp b/extensions/lua/LuaScriptStateManager.cpp new file mode 100644 index 000000000..e559ed2b9 --- /dev/null +++ b/extensions/lua/LuaScriptStateManager.cpp @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LuaScriptStateManager.h" + +namespace org::apache::nifi::minifi::extensions::lua { + +namespace { +core::StateManager::State from_lua(const sol::table& lua_state) { + core::StateManager::State cpp_state; + for (const auto& [lua_state_key, lua_state_value] : lua_state) + cpp_state[lua_state_key.as<std::string>()] = lua_state_value.as<std::string>(); + return cpp_state; +} + +sol::table to_lua(const core::StateManager::State& cpp_state, sol::state& sol_state) { + auto lua_state = sol::table(sol_state.lua_state(), sol::create); + for (const auto& [cpp_state_key, cpp_state_value] : cpp_state) + lua_state[cpp_state_key] = cpp_state_value; + return lua_state; +} +} // namespace + +bool LuaScriptStateManager::set(const sol::table& core_component_state_lua) { + if (!state_manager_) + return false; + + return state_manager_->set(from_lua(core_component_state_lua)); +} + +sol::optional<sol::table> LuaScriptStateManager::get() { + if (!state_manager_) + return sol::nullopt; + if (auto core_component_state_cpp = state_manager_->get()) + return to_lua(*core_component_state_cpp, sol_state_); + return sol::nullopt; +} + +} // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/LuaScriptProcessContext.h b/extensions/lua/LuaScriptStateManager.h similarity index 72% copy from extensions/lua/LuaScriptProcessContext.h copy to extensions/lua/LuaScriptStateManager.h index 72dd161b9..a9e8ce47f 100644 --- a/extensions/lua/LuaScriptProcessContext.h +++ b/extensions/lua/LuaScriptStateManager.h @@ -21,19 +21,21 @@ #include <string> #include <memory> -#include "core/ProcessSession.h" +#include "sol/sol.hpp" +#include "core/StateManager.h" namespace org::apache::nifi::minifi::extensions::lua { -class LuaScriptProcessContext { +class LuaScriptStateManager { public: - explicit LuaScriptProcessContext(std::shared_ptr<core::ProcessContext> context); + explicit LuaScriptStateManager(core::StateManager* state_manager, sol::state& sol_state) : state_manager_(state_manager), sol_state_(sol_state) {} - std::string getProperty(const std::string &name); - void releaseProcessContext(); + bool set(const sol::table& core_component_state_lua); + sol::optional<sol::table> get(); private: - std::shared_ptr<core::ProcessContext> context_; + core::StateManager* state_manager_; + sol::state& sol_state_; }; } // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/tests/TestExecuteScriptProcessorWithLuaScript.cpp b/extensions/lua/tests/TestExecuteScriptProcessorWithLuaScript.cpp index 1e5902224..335000f03 100644 --- a/extensions/lua/tests/TestExecuteScriptProcessorWithLuaScript.cpp +++ b/extensions/lua/tests/TestExecuteScriptProcessorWithLuaScript.cpp @@ -319,4 +319,32 @@ TEST_CASE("Lua can remove flowfiles", "[ExecuteScript]") { REQUIRE(result.at(ExecuteScript::Failure).empty()); } +TEST_CASE("Lua can store states in StateManager", "[ExecuteScript]") { + const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript"); + + minifi::test::SingleProcessorTestController controller{execute_script}; + LogTestController::getInstance().setTrace<minifi::processors::ExecuteScript>(); + execute_script->setProperty(ExecuteScript::ScriptEngine, "lua"); + execute_script->setProperty(ExecuteScript::ScriptBody.getName(), + R"( + function onTrigger(context, session) + state_manager = context:getStateManager() + state = state_manager:get() + if state == nil then + state = {} + state['lua_trigger_count'] = 0 + end + lua_trigger_count = state['lua_trigger_count'] + log:info('lua_trigger_count: ' .. lua_trigger_count) + state['lua_trigger_count'] = tostring(tonumber(lua_trigger_count) + 1) + state_manager:set(state) + end + )"); + + for (size_t i = 0; i < 4; ++i) { + controller.trigger(); + CHECK(LogTestController::getInstance().contains(fmt::format("lua_trigger_count: {}", i))); + } +} + } // namespace org::apache::nifi::minifi::processors::test diff --git a/extensions/python/PythonBindings.cpp b/extensions/python/PythonBindings.cpp index 8d57a128f..7ac7e2767 100644 --- a/extensions/python/PythonBindings.cpp +++ b/extensions/python/PythonBindings.cpp @@ -25,6 +25,7 @@ #include "types/PyRelationship.h" #include "types/PyInputStream.h" #include "types/PyOutputStream.h" +#include "types/PyStateManager.h" namespace org::apache::nifi::minifi::extensions::python { extern "C" { @@ -43,16 +44,17 @@ struct PyModuleDef minifi_module = { PyMODINIT_FUNC PyInit_minifi_native(void) { - const std::array<std::pair<PyTypeObject*, std::string_view>, 8> types = { - std::make_pair(PyLogger::typeObject(), "Logger"), - std::make_pair(PyProcessSessionObject::typeObject(), "ProcessSession"), - std::make_pair(PyProcessContext::typeObject(), "ProcessContext"), - std::make_pair(PyProcessor::typeObject(), "Processor"), - std::make_pair(PyScriptFlowFile::typeObject(), "FlowFile"), - std::make_pair(PyRelationship::typeObject(), "Relationship"), - std::make_pair(PyInputStream::typeObject(), "InputStream"), - std::make_pair(PyOutputStream::typeObject(), "OutputStream") - }; + const std::array types = std::to_array<std::pair<PyTypeObject*, std::string_view>>({ + std::make_pair(PyLogger::typeObject(), "Logger"), + std::make_pair(PyProcessSessionObject::typeObject(), "ProcessSession"), + std::make_pair(PyProcessContext::typeObject(), "ProcessContext"), + std::make_pair(PyProcessor::typeObject(), "Processor"), + std::make_pair(PyScriptFlowFile::typeObject(), "FlowFile"), + std::make_pair(PyRelationship::typeObject(), "Relationship"), + std::make_pair(PyInputStream::typeObject(), "InputStream"), + std::make_pair(PyOutputStream::typeObject(), "OutputStream"), + std::make_pair(PyStateManager::typeObject(), "StateManager") + }); for (const auto& type : types) { if (PyType_Ready(type.first) < 0) { diff --git a/extensions/python/tests/TestExecuteScriptProcessorWithPythonScript.cpp b/extensions/python/tests/TestExecuteScriptProcessorWithPythonScript.cpp index 90236b763..2b3ef0a98 100644 --- a/extensions/python/tests/TestExecuteScriptProcessorWithPythonScript.cpp +++ b/extensions/python/tests/TestExecuteScriptProcessorWithPythonScript.cpp @@ -262,4 +262,30 @@ def onTrigger(context, session): REQUIRE(result.at(ExecuteScript::Failure).empty()); } +TEST_CASE("Python can store states in StateManager", "[ExecuteScript]") { + const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript"); + + minifi::test::SingleProcessorTestController controller{execute_script}; + LogTestController::getInstance().setTrace<minifi::processors::ExecuteScript>(); + execute_script->setProperty(ExecuteScript::ScriptEngine, "python"); + execute_script->setProperty(ExecuteScript::ScriptBody.getName(), + R"( +def onTrigger(context, session): + state_manager = context.getStateManager() + state = state_manager.get() + if state is None: + state = {} + state['python_trigger_count'] = 0 + python_trigger_count = state['python_trigger_count'] + log.info('python_trigger_count: ' + str(python_trigger_count)) + state['python_trigger_count'] =str(int(python_trigger_count) + 1) + state_manager.set(state) +)"); + + for (size_t i = 0; i < 4; ++i) { + controller.trigger(); + CHECK(LogTestController::getInstance().contains(fmt::format("python_trigger_count: {}", i))); + } +} + } // namespace org::apache::nifi::minifi::processors::test diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 0463b8ebe..899b54544 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -16,6 +16,7 @@ a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ #include "PyProcessContext.h" +#include "PyStateManager.h" #include <string> #include "PyException.h" @@ -24,6 +25,7 @@ namespace org::apache::nifi::minifi::extensions::python { static PyMethodDef PyProcessContext_methods[] = { {"getProperty", (PyCFunction) PyProcessContext::getProperty, METH_VARARGS, nullptr}, + {"getStateManager", (PyCFunction) PyProcessContext::getStateManager, METH_VARARGS, nullptr}, {} /* Sentinel */ }; @@ -72,6 +74,16 @@ PyObject* PyProcessContext::getProperty(PyProcessContext* self, PyObject* args) return object::returnReference(value); } +PyObject* PyProcessContext::getStateManager(PyProcessContext* self, PyObject*) { + auto context = self->process_context_.lock(); + if (!context) { + PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); + return nullptr; + } + + return object::returnReference(context->getStateManager()); +} + PyTypeObject* PyProcessContext::typeObject() { static OwnedObject PyProcessContextType{PyType_FromSpec(&PyProcessContextTypeSpec)}; return reinterpret_cast<PyTypeObject*>(PyProcessContextType.get()); diff --git a/extensions/python/types/PyProcessContext.h b/extensions/python/types/PyProcessContext.h index 1245f6049..decf67478 100644 --- a/extensions/python/types/PyProcessContext.h +++ b/extensions/python/types/PyProcessContext.h @@ -34,6 +34,7 @@ struct PyProcessContext { static int init(PyProcessContext* self, PyObject* args, PyObject* kwds); static PyObject* getProperty(PyProcessContext* self, PyObject* args); + static PyObject* getStateManager(PyProcessContext* self, PyObject* args); static PyTypeObject* typeObject(); }; diff --git a/extensions/python/types/PyStateManager.cpp b/extensions/python/types/PyStateManager.cpp new file mode 100644 index 000000000..fce6eb2e0 --- /dev/null +++ b/extensions/python/types/PyStateManager.cpp @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, +a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PyStateManager.h" +#include <string> +#include "PyException.h" + +extern "C" { +namespace org::apache::nifi::minifi::extensions::python { + +static PyMethodDef PyStateManager_methods[] = { + {"get", (PyCFunction) PyStateManager::get, METH_VARARGS, nullptr}, + {"set", (PyCFunction) PyStateManager::set, METH_VARARGS, nullptr}, + {} /* Sentinel */ +}; + +static PyType_Slot PyStateManagerTypeSpecSlots[] = { + {Py_tp_dealloc, reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyStateManager>)}, + {Py_tp_init, reinterpret_cast<void*>(PyStateManager::init)}, + {Py_tp_methods, reinterpret_cast<void*>(PyStateManager_methods)}, + {Py_tp_new, reinterpret_cast<void*>(newPythonAllocatedInstance<PyStateManager>)}, + {} /* Sentinel */ +}; + +static PyType_Spec PyStateManagerTypeSpec{ + .name = "minifi_native.StateManager", + .basicsize = sizeof(PyStateManager), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT, + .slots = PyStateManagerTypeSpecSlots +}; + +int PyStateManager::init(PyStateManager* self, PyObject* args, PyObject*) { + PyObject* weak_ptr_capsule = nullptr; + if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) { + return -1; + } + + auto process_context = PyCapsule_GetPointer(weak_ptr_capsule, HeldTypeName); + if (!process_context) + throw PyException(); + self->state_manager_ = *static_cast<HeldType*>(process_context); + return 0; +} + +PyObject* PyStateManager::set(PyStateManager* self, PyObject* args) { + if (!self->state_manager_) { + PyErr_SetString(PyExc_AttributeError, "tried reading state manager outside 'on_trigger'"); + return nullptr; + } + core::StateManager::State cpp_state; + + auto python_state = BorrowedDict::fromTuple(args, 0); + + auto python_state_keys = OwnedList(PyDict_Keys(python_state.get())); + for (size_t i = 0; i < python_state_keys.length(); ++i) { + BorrowedStr key{python_state_keys[i]}; + if (auto value = python_state[key.toUtf8String()]) { + BorrowedStr value_str{*value}; + cpp_state[key.toUtf8String()] = value_str.toUtf8String(); + } + } + + return object::returnReference(self->state_manager_->set(cpp_state)); +} + +PyObject* PyStateManager::get(PyStateManager* self, PyObject*) { + if (!self->state_manager_) { + PyErr_SetString(PyExc_AttributeError, "tried reading state manager outside 'on_trigger'"); + return nullptr; + } + if (auto cpp_state = self->state_manager_->get()) { + auto python_state = OwnedDict::create(); + for (const auto& [cpp_state_key, cpp_state_value] : *cpp_state) { + python_state.put(cpp_state_key, cpp_state_value); + } + return object::returnReference(python_state); + } else { + Py_RETURN_NONE; + } +} + +PyTypeObject* PyStateManager::typeObject() { + static OwnedObject PyStateManagerType{PyType_FromSpec(&PyStateManagerTypeSpec)}; + return reinterpret_cast<PyTypeObject*>(PyStateManagerType.get()); +} + +} // namespace org::apache::nifi::minifi::extensions::python +} // extern "C" diff --git a/extensions/python/types/PyProcessContext.h b/extensions/python/types/PyStateManager.h similarity index 68% copy from extensions/python/types/PyProcessContext.h copy to extensions/python/types/PyStateManager.h index 1245f6049..7e0c71873 100644 --- a/extensions/python/types/PyProcessContext.h +++ b/extensions/python/types/PyStateManager.h @@ -18,28 +18,29 @@ #include <memory> -#include "core/ProcessContext.h" +#include "core/StateManager.h" #include "../PythonBindings.h" namespace org::apache::nifi::minifi::extensions::python { -struct PyProcessContext { - PyProcessContext() {} - using HeldType = std::weak_ptr<core::ProcessContext>; - static constexpr const char* HeldTypeName = "PyProcessContext::HeldType"; +struct PyStateManager { + PyStateManager() {} + using HeldType = core::StateManager*; + static constexpr const char* HeldTypeName = "PyStateManager::HeldType"; PyObject_HEAD - HeldType process_context_; + HeldType state_manager_; - static int init(PyProcessContext* self, PyObject* args, PyObject* kwds); + static int init(PyStateManager* self, PyObject* args, PyObject* kwds); - static PyObject* getProperty(PyProcessContext* self, PyObject* args); + static PyObject* set(PyStateManager* self, PyObject* args); + static PyObject* get(PyStateManager* self, PyObject* args); static PyTypeObject* typeObject(); }; namespace object { template<> -struct Converter<PyProcessContext::HeldType> : public HolderTypeConverter<PyProcessContext> {}; +struct Converter<PyStateManager::HeldType> : public HolderTypeConverter<PyStateManager> {}; } // namespace object } // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/types/Types.h b/extensions/python/types/Types.h index 292059c7f..97203a6bb 100644 --- a/extensions/python/types/Types.h +++ b/extensions/python/types/Types.h @@ -279,6 +279,13 @@ class Dict : public ReferenceHolder<reference_type> { auto item = BorrowedReference(PyDict_GetItemString(this->ref_.get(), key.data())); return item ? std::optional{item} : std::nullopt; } + + static BorrowedDict fromTuple(PyObject* tuple, Py_ssize_t location) requires(reference_type == ReferenceType::BORROWED) { + BorrowedDict dict_from_tuple{PyTuple_GetItem(tuple, location)}; + if (dict_from_tuple.get() == nullptr) + throw PyException(); + return dict_from_tuple; + } }; namespace callable {
