This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0d1402a Use readNextAsync for reader.read_next() (#125)
0d1402a is described below
commit 0d1402a522a8e1b01c1069adcbb58a0f6b27e077
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 24 16:57:19 2023 -0700
Use readNextAsync for reader.read_next() (#125)
---
src/reader.cc | 28 +---------------------------
1 file changed, 1 insertion(+), 27 deletions(-)
diff --git a/src/reader.cc b/src/reader.cc
index 0126f3f..7c66774 100644
--- a/src/reader.cc
+++ b/src/reader.cc
@@ -22,33 +22,7 @@
namespace py = pybind11;
Message Reader_readNext(Reader& reader) {
- Message msg;
- Result res;
-
- // TODO: There is currently no readNextAsync() version for the Reader.
- // Once that's available, we should also convert these ad-hoc loops.
- while (true) {
- Py_BEGIN_ALLOW_THREADS
- // Use 100ms timeout to periodically check whether the
- // interpreter was interrupted
- res = reader.readNext(msg, 100);
- Py_END_ALLOW_THREADS
-
- if (res != ResultTimeout) {
- // In case of timeout we keep calling receive() to simulate a
- // blocking call until a message is available, while breaking
- // every once in a while to check the Python signal status
- break;
- }
-
- if (PyErr_CheckSignals() == -1) {
- PyErr_SetInterrupt();
- return msg;
- }
- }
-
- CHECK_RESULT(res);
- return msg;
+ return waitForAsyncValue<Message>([&](ReadNextCallback callback) {
reader.readNextAsync(callback); });
}
Message Reader_readNextTimeout(Reader& reader, int timeoutMs) {