This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 875bfdfc879bdef09be3c6cebeaa43fdce192ebb Author: Livio BenĨik <[email protected]> AuthorDate: Wed Jul 14 09:02:34 2021 +0200 [python-client] Fixed crash when using Python logger (#10981) ### Motivation In some cases, the Python client would crash when using the new `logger` option. This happens when a Pulsar message is sent asynchronously, but soon after the program exits (and even then, not always). For example, when doing Django migrations which include sending a message: ``` ... [2021-06-19 06:53:57.691] [INFO]: Created connection for pulsar://localhost:6650 [2021-06-19 06:53:57.693] [INFO]: [127.0.0.1:36536 -> 127.0.0.1:6650] Connected to broker [2021-06-19 06:53:57.695] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool [2021-06-19 06:53:57.707] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:36536 -> 127.0.0.1:6650] ... [2021-06-19 06:53:57.728] [DEBUG]: Sending message to topic ..... Applying dashboard.0001_initial... OK Applying templating.0001_initial... OK Error in sys.excepthook: Original exception was: Failed to migrate dashboard! Return code was: -6 ``` This happens because Pulsar tries to log messages after Python already started finalizing, so the client can't get a GIL lock, which crashes the whole client. ### Modifications Following the instructions at https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure, I added a check for when Python is finalizing, and if it is, we fallback to the default console logger (the log level is still respected correctly). Now it looks like this: ``` ... [2021-06-19 06:45:15.561] [INFO]: Created connection for pulsar://localhost:6650 [2021-06-19 06:45:15.563] [INFO]: [127.0.0.1:35930 -> 127.0.0.1:6650] Connected to broker [2021-06-19 06:45:15.568] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool [2021-06-19 06:45:15.586] [INFO]: [persistent://public/default/zaba-dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:35930 -> 127.0.0.1:6650] ... [2021-06-19 06:45:15.604] [DEBUG]: Sending message to topic ..... Applying dashboard.0001_initial... OK Applying templating.0001_initial... OK 2021-06-19 06:45:16.200 INFO [139853253269312] ClientConnection:1446 | [127.0.0.1:35930 -> 127.0.0.1:6650] Connection closed 2021-06-19 06:45:16.200 ERROR [139853099652672] ClientConnection:531 | [127.0.0.1:35930 -> 127.0.0.1:6650] Read failed: Operation canceled 2021-06-19 06:45:16.201 INFO [139853253269312] ClientConnection:261 | [127.0.0.1:35930 -> 127.0.0.1:6650] Destroyed connection 2021-06-19 06:45:16.201 INFO [139853253269312] ProducerImpl:561 | Producer - [persistent://public/default/dashboard-global_context-emit, standalone-0-120] , [batchMessageContainer = { BatchMessageContainer [size = 0] [bytes = 0] [maxSize = 1000] [maxBytes = 131072] [topicName = persistent://public/default/dashboard-global_context-emit] [numberOfBatchesSent_ = 1] [averageBatchSize_ = 1] }] Successfully migrated dashboard ``` (cherry picked from commit fc8ce64b1328945ab8e06aad56151294295f003a) --- pulsar-client-cpp/python/src/config.cc | 65 +++++++++++++++++----------------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index b665ec7..0b30713 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -17,6 +17,7 @@ * under the License. */ #include "utils.h" +#include <pulsar/ConsoleLoggerFactory.h> template<typename T> struct ListenerWrapper { @@ -90,6 +91,7 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC class LoggerWrapper: public Logger { PyObject* _pyLogger; + Logger* fallbackLogger; int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO); void _updateCurrentPythonLogLevel() { @@ -110,26 +112,19 @@ class LoggerWrapper: public Logger { public: - LoggerWrapper(const std::string &logger, PyObject* pyLogger) { + LoggerWrapper(const std::string &filename, PyObject* pyLogger) { _pyLogger = pyLogger; Py_XINCREF(_pyLogger); - _updateCurrentPythonLogLevel(); - } - - LoggerWrapper(const LoggerWrapper& other) { - _pyLogger = other._pyLogger; - Py_XINCREF(_pyLogger); - } + std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory()); + fallbackLogger = factory->getLogger(filename); - LoggerWrapper& operator=(const LoggerWrapper& other) { - _pyLogger = other._pyLogger; - Py_XINCREF(_pyLogger); - return *this; + _updateCurrentPythonLogLevel(); } virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); + delete fallbackLogger; } bool isEnabled(Level level) { @@ -137,34 +132,38 @@ class LoggerWrapper: public Logger { } void log(Level level, int line, const std::string& message) { - PyGILState_STATE state = PyGILState_Ensure(); - - try { - switch (level) { - case Logger::LEVEL_DEBUG: - py::call_method<void>(_pyLogger, "debug", message.c_str()); - break; - case Logger::LEVEL_INFO: - py::call_method<void>(_pyLogger, "info", message.c_str()); - break; - case Logger::LEVEL_WARN: - py::call_method<void>(_pyLogger, "warning", message.c_str()); - break; - case Logger::LEVEL_ERROR: - py::call_method<void>(_pyLogger, "error", message.c_str()); - break; + if (Py_IsInitialized() != true) { + // Python logger is unavailable - fallback to console logger + fallbackLogger->log(level, line, message); + } else { + PyGILState_STATE state = PyGILState_Ensure(); + + try { + switch (level) { + case Logger::LEVEL_DEBUG: + py::call_method<void>(_pyLogger, "debug", message.c_str()); + break; + case Logger::LEVEL_INFO: + py::call_method<void>(_pyLogger, "info", message.c_str()); + break; + case Logger::LEVEL_WARN: + py::call_method<void>(_pyLogger, "warning", message.c_str()); + break; + case Logger::LEVEL_ERROR: + py::call_method<void>(_pyLogger, "error", message.c_str()); + break; + } + + } catch (py::error_already_set e) { + PyErr_Print(); } - } catch (py::error_already_set e) { - PyErr_Print(); + PyGILState_Release(state); } - - PyGILState_Release(state); } }; class LoggerWrapperFactory : public LoggerFactory { - static LoggerWrapperFactory* _instance; PyObject* _pyLogger; public:
