This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f18e501fb0 [INLONG-10811][SDK] Fix callback function calls lead to 
coredump in python sdk (#10812)
f18e501fb0 is described below

commit f18e501fb08c294094f3a717089fa289ea009ed2
Author: yfsn666 <[email protected]>
AuthorDate: Tue Aug 20 17:10:08 2024 +0800

    [INLONG-10811][SDK] Fix callback function calls lead to coredump in python 
sdk (#10812)
    
    * [INLONG-10811][SDK] Fix callback function calls lead to coredump in 
python sdk
    
    * [INLONG-10811][SDK] Fix the close_api waitms
    
    * [INLONG-10811][SDK] Update pybind wrapper code
    
    * [INLONG-10811][SDK] Update pybind wrapper code
    
    * [INLONG-10811][SDK] Update pybind wrapper code
    
    * [INLONG-10811][SDK] Update pybind wrapper code
---
 .../dataproxy-sdk-python/CMakeLists.txt            | 15 ++--
 .../dataproxy-sdk-python/demo/send_demo.py         |  6 +-
 .../dataproxy-sdk-python/inlong_dataproxy.cpp      | 85 ++++++++++++++--------
 3 files changed, 66 insertions(+), 40 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
index 80791b1c02..5f84a35b19 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt
@@ -22,17 +22,18 @@ project(dataproxy-sdk-python)
 
 set(CMAKE_CXX_STANDARD 11)
 
-include_directories("./dataproxy-sdk-cpp/src/core")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/src/core")
 
-include_directories("./dataproxy-sdk-cpp/third_party/lib")
-include_directories("./dataproxy-sdk-cpp/third_party/lib64")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib64")
+include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib")
 
 add_subdirectory(pybind11)
-add_subdirectory(dataproxy-sdk-cpp)
 
-link_directories("./dataproxy-sdk-cpp/third_party/lib")
-link_directories("./dataproxy-sdk-cpp/third_party/lib64")
+link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib")
+link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib64")
+link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib")
 
 pybind11_add_module(inlong_dataproxy inlong_dataproxy.cpp)
 
-target_link_libraries(inlong_dataproxy PRIVATE pybind11::module dataproxy_sdk)
+target_link_libraries(inlong_dataproxy PRIVATE 
"${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib/dataproxy_sdk.a" 
liblog4cplusS.a libsnappy.a libcurl.a libssl.a libcrypto.a)
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
index e8c33bc23a..75974baa95 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py
@@ -40,7 +40,7 @@ def main():
     # step1. init api
     init_status = inlong_api.init_api(sys.argv[1])
     if init_status:
-        print("init error, error code is: " + init_status)
+        print("init error, error code is: " + str(init_status))
         return
 
     print("---->start sdk successfully")
@@ -62,12 +62,12 @@ def main():
     for i in range(count):
         send_status = inlong_api.send(inlong_group_id, inlong_stream_id, msg, 
len(msg), callback_func)
         if send_status:
-            print("tc_api_send error, error code is: " + send_status)
+            print("tc_api_send error, error code is: " + str(send_status))
 
     # step3. close api
     close_status = inlong_api.close_api(10000)
     if close_status:
-        print("close sdk error, error code is: " + close_status)
+        print("close sdk error, error code is: " + str(close_status))
     else:
         print("---->close sdk successfully")
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
index 26c260f3f9..2fc5eeee83 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp
@@ -18,44 +18,69 @@
  */
 
 #include <pybind11/pybind11.h>
+#include <pybind11/functional.h>
 #include <pybind11/stl.h>
 #include <inlong_api.h>
+#include <atomic>
+#include <thread>
+#include <iostream>
 
 namespace py = pybind11;
-using namespace inlong;
 
-class PyInLongApi : public InLongApi {
-public:
-    int32_t PySend(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len, py::function callback_func) {
-        py_callback = callback_func;
-        return Send(inlong_group_id, inlong_stream_id, msg, msg_len, 
&PyInLongApi::CallbackFunc);
-    }
-
-private:
-    static py::function py_callback;
+std::map<inlong::UserCallBack, py::function> g_py_callbacks;
+std::atomic<bool> stop_callbacks(false);
+std::mutex callback_mutex;
 
-    static int CallbackFunc(const char *a, const char *b, const char *c, 
int32_t d, const int64_t e, const char *f) {
-        if (py_callback) {
-            try {
-                return py_callback(a, b, c, d, e, f).cast<int>();
-            } catch (const py::error_already_set &e) {
-                // Handle Python exception
-                return -1;
-            }
+int UserCallBackBridge(const char *a, const char *b, const char *c, int32_t d, 
const int64_t e, const char *f) {
+    std::unique_lock<std::mutex> lock(callback_mutex);
+    if (stop_callbacks) {
+        return -1;
+    }
+    auto it = g_py_callbacks.find(UserCallBackBridge);
+    if (it != g_py_callbacks.end()) {
+        if (stop_callbacks) {
+            return -1;
+        }
+        py::gil_scoped_acquire acquire;
+        if (stop_callbacks) {
+            return -1;
         }
-        return 0;
+        int result = it->second(a, b, c, d, e, f).cast<int>();
+        py::gil_scoped_release release;
+        return result;
     }
-};
-
-py::function PyInLongApi::py_callback;
+    return -1;
+}
 
 PYBIND11_MODULE(inlong_dataproxy, m) {
-    m.doc() = "This module provides InLong dataproxy api to send message to 
InLong dataproxy.";
-
-    py::class_<PyInLongApi>(m, "InLongApi")
+    m.doc() = "Python bindings for InLong SDK API";
+    py::class_<inlong::InLongApi>(m, "InLongApi")
         .def(py::init<>())
-        .def("init_api", &PyInLongApi::InitApi, py::arg("config_path"))
-        .def("add_bid", &PyInLongApi::AddBid, py::arg("group_ids"))
-        .def("send", &PyInLongApi::PySend, py::arg("inlong_group_id"), 
py::arg("inlong_stream_id"), py::arg("msg"), py::arg("msg_len"), 
py::arg("callback_func") = nullptr)
-        .def("close_api", &PyInLongApi::CloseApi, py::arg("max_waitms"));
-}
+        .def("init_api", [](inlong::InLongApi& self, const char* config_path) {
+            stop_callbacks = false;
+            g_py_callbacks.clear();
+            py::gil_scoped_release release;
+            int result = self.InitApi(config_path);
+            return result;
+        })
+        .def("add_bid", &inlong::InLongApi::AddBid)
+        .def("send", [](inlong::InLongApi& self, const char* groupId, const 
char* streamId, const char* msg, int32_t msgLen, py::object pyCallback = 
py::none()) {
+            if (!pyCallback.is(py::none())) {
+                g_py_callbacks[UserCallBackBridge] = 
pyCallback.cast<py::function>();
+                py::gil_scoped_release release;
+                int result = self.Send(groupId, streamId, msg, msgLen, 
UserCallBackBridge);
+                return result;
+            } else {
+                int result = self.Send(groupId, streamId, msg, msgLen, 
nullptr);
+                return result;
+            }
+        })
+        .def("close_api", [](inlong::InLongApi& self, int32_t timeout_ms) {
+            py::gil_scoped_release release;
+            int result = self.CloseApi(timeout_ms);
+            stop_callbacks = true;
+            std::unique_lock<std::mutex> lock(callback_mutex);
+            py::gil_scoped_acquire acquire;
+            return result;
+        });
+}
\ No newline at end of file

Reply via email to