This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f18e501fb0 [INLONG-10811][SDK] Fix callback function calls lead to
coredump in python sdk (#10812)
f18e501fb0 is described below
commit f18e501fb08c294094f3a717089fa289ea009ed2
Author: yfsn666 <[email protected]>
AuthorDate: Tue Aug 20 17:10:08 2024 +0800
[INLONG-10811][SDK] Fix callback function calls lead to coredump in python
sdk (#10812)
* [INLONG-10811][SDK] Fix callback function calls lead to coredump in
python sdk
* [INLONG-10811][SDK] Fix the close_api waitms
* [INLONG-10811][SDK] Update pybind wrapper code
* [INLONG-10811][SDK] Update pybind wrapper code
* [INLONG-10811][SDK] Update pybind wrapper code
* [INLONG-10811][SDK] Update pybind wrapper code
---
.../dataproxy-sdk-python/CMakeLists.txt | 15 ++--
.../dataproxy-sdk-python/demo/send_demo.py | 6 +-
.../dataproxy-sdk-python/inlong_dataproxy.cpp | 85 ++++++++++++++--------
3 files changed, 66 insertions(+), 40 deletions(-)
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
index 80791b1c02..5f84a35b19 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
@@ -22,17 +22,18 @@ project(dataproxy-sdk-python)
set(CMAKE_CXX_STANDARD 11)
-include_directories("./dataproxy-sdk-cpp/src/core")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/src/core")
-include_directories("./dataproxy-sdk-cpp/third_party/lib")
-include_directories("./dataproxy-sdk-cpp/third_party/lib64")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib64")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib")
add_subdirectory(pybind11)
-add_subdirectory(dataproxy-sdk-cpp)
-link_directories("./dataproxy-sdk-cpp/third_party/lib")
-link_directories("./dataproxy-sdk-cpp/third_party/lib64")
+link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib")
+link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib64")
+link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib")
pybind11_add_module(inlong_dataproxy inlong_dataproxy.cpp)
-target_link_libraries(inlong_dataproxy PRIVATE pybind11::module dataproxy_sdk)
+target_link_libraries(inlong_dataproxy PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib/dataproxy_sdk.a"
liblog4cplusS.a libsnappy.a libcurl.a libssl.a libcrypto.a)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
index e8c33bc23a..75974baa95 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
@@ -40,7 +40,7 @@ def main():
# step1. init api
init_status = inlong_api.init_api(sys.argv[1])
if init_status:
- print("init error, error code is: " + init_status)
+ print("init error, error code is: " + str(init_status))
return
print("---->start sdk successfully")
@@ -62,12 +62,12 @@ def main():
for i in range(count):
send_status = inlong_api.send(inlong_group_id, inlong_stream_id, msg,
len(msg), callback_func)
if send_status:
- print("tc_api_send error, error code is: " + send_status)
+ print("tc_api_send error, error code is: " + str(send_status))
# step3. close api
close_status = inlong_api.close_api(10000)
if close_status:
- print("close sdk error, error code is: " + close_status)
+ print("close sdk error, error code is: " + str(close_status))
else:
print("---->close sdk successfully")
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
index 26c260f3f9..2fc5eeee83 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
@@ -18,44 +18,69 @@
*/
#include <pybind11/pybind11.h>
+#include <pybind11/functional.h>
#include <pybind11/stl.h>
#include <inlong_api.h>
+#include <atomic>
+#include <thread>
+#include <iostream>
namespace py = pybind11;
-using namespace inlong;
-class PyInLongApi : public InLongApi {
-public:
- int32_t PySend(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len, py::function callback_func) {
- py_callback = callback_func;
- return Send(inlong_group_id, inlong_stream_id, msg, msg_len,
&PyInLongApi::CallbackFunc);
- }
-
-private:
- static py::function py_callback;
+std::map<inlong::UserCallBack, py::function> g_py_callbacks;
+std::atomic<bool> stop_callbacks(false);
+std::mutex callback_mutex;
- static int CallbackFunc(const char *a, const char *b, const char *c,
int32_t d, const int64_t e, const char *f) {
- if (py_callback) {
- try {
- return py_callback(a, b, c, d, e, f).cast<int>();
- } catch (const py::error_already_set &e) {
- // Handle Python exception
- return -1;
- }
+int UserCallBackBridge(const char *a, const char *b, const char *c, int32_t d,
const int64_t e, const char *f) {
+ std::unique_lock<std::mutex> lock(callback_mutex);
+ if (stop_callbacks) {
+ return -1;
+ }
+ auto it = g_py_callbacks.find(UserCallBackBridge);
+ if (it != g_py_callbacks.end()) {
+ if (stop_callbacks) {
+ return -1;
+ }
+ py::gil_scoped_acquire acquire;
+ if (stop_callbacks) {
+ return -1;
}
- return 0;
+ int result = it->second(a, b, c, d, e, f).cast<int>();
+ py::gil_scoped_release release;
+ return result;
}
-};
-
-py::function PyInLongApi::py_callback;
+ return -1;
+}
PYBIND11_MODULE(inlong_dataproxy, m) {
- m.doc() = "This module provides InLong dataproxy api to send message to
InLong dataproxy.";
-
- py::class_<PyInLongApi>(m, "InLongApi")
+ m.doc() = "Python bindings for InLong SDK API";
+ py::class_<inlong::InLongApi>(m, "InLongApi")
.def(py::init<>())
- .def("init_api", &PyInLongApi::InitApi, py::arg("config_path"))
- .def("add_bid", &PyInLongApi::AddBid, py::arg("group_ids"))
- .def("send", &PyInLongApi::PySend, py::arg("inlong_group_id"),
py::arg("inlong_stream_id"), py::arg("msg"), py::arg("msg_len"),
py::arg("callback_func") = nullptr)
- .def("close_api", &PyInLongApi::CloseApi, py::arg("max_waitms"));
-}
+ .def("init_api", [](inlong::InLongApi& self, const char* config_path) {
+ stop_callbacks = false;
+ g_py_callbacks.clear();
+ py::gil_scoped_release release;
+ int result = self.InitApi(config_path);
+ return result;
+ })
+ .def("add_bid", &inlong::InLongApi::AddBid)
+ .def("send", [](inlong::InLongApi& self, const char* groupId, const
char* streamId, const char* msg, int32_t msgLen, py::object pyCallback =
py::none()) {
+ if (!pyCallback.is(py::none())) {
+ g_py_callbacks[UserCallBackBridge] =
pyCallback.cast<py::function>();
+ py::gil_scoped_release release;
+ int result = self.Send(groupId, streamId, msg, msgLen,
UserCallBackBridge);
+ return result;
+ } else {
+ int result = self.Send(groupId, streamId, msg, msgLen,
nullptr);
+ return result;
+ }
+ })
+ .def("close_api", [](inlong::InLongApi& self, int32_t timeout_ms) {
+ py::gil_scoped_release release;
+ int result = self.CloseApi(timeout_ms);
+ stop_callbacks = true;
+ std::unique_lock<std::mutex> lock(callback_mutex);
+ py::gil_scoped_acquire acquire;
+ return result;
+ });
+}
\ No newline at end of file