This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push: new ec05f50 Wrap the interruption to a custom exception when a blocking API is interrupted (#99) ec05f50 is described below commit ec05f50bf489aef85532d61f577c62649a5b71a6 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Wed Mar 8 00:03:10 2023 +0800 Wrap the interruption to a custom exception when a blocking API is interrupted (#99) ### Motivation Currently, when a blocking API is interrupted by a signal, `SystemError` will be thrown. However, in this case, `PyErr_SetInterrupt` will be called and next time a blocking API is called, `std::system_error` will be somehow thrown. The failure of https://lists.apache.org/thread/cmzykd9qz9x1d0s35nc5912o3slwpxpv is caused by this issue. The `SystemError` is not called, then `client.close()` will be skipped, which leads to the `bad_weak_ptr` error. P.S. Currently we have to call `client.close()` on a `Client` instance, otherwise, the `bad_weak_ptr` will be thrown. However, even if we caught the `SystemError` like: ```python try: msg = consumer.receive() # ... except SystemError: break ``` we would still see the following error: ``` terminate called after throwing an instance of 'std::system_error' what(): Operation not permitted Aborted ``` ### Modifications - Wrap `ResultInterrupted` into the `pulsar.Interrupted` exception. - Refactor the `waitForAsyncValue` and `waitForAsyncResult` functions and raise `pulsar.Interrupted` when `PyErr_CheckSignals` detects a signal. - Add `InterruptedTest` to cover this case. - Remove `future.h` since we now use `std::future` instead of the manually implemented `Future`. - Fix the `examples/consumer.py` to support stopping by Ctrl+C. --- examples/consumer.py | 10 +- pulsar/exceptions.py | 2 +- src/client.cc | 61 ++------ src/consumer.cc | 23 +-- src/future.h | 181 ---------------------- src/producer.cc | 14 +- src/reader.cc | 10 +- src/utils.cc | 37 ++--- src/utils.h | 62 ++------ examples/consumer.py => tests/interrupted_test.py | 39 +++-- tests/run-unit-tests.sh | 1 + 11 files changed, 97 insertions(+), 343 deletions(-) diff --git a/examples/consumer.py b/examples/consumer.py index 8c2985e..d698f48 100755 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -29,8 +29,12 @@ consumer = client.subscribe('my-topic', "my-subscription", }) while True: - msg = consumer.receive() - print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id())) - consumer.acknowledge(msg) + try: + msg = consumer.receive() + print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id())) + consumer.acknowledge(msg) + except pulsar.Interrupted: + print("Stop receiving messages") + break client.close() diff --git a/pulsar/exceptions.py b/pulsar/exceptions.py index d151564..1b425c8 100644 --- a/pulsar/exceptions.py +++ b/pulsar/exceptions.py @@ -25,4 +25,4 @@ from _pulsar import PulsarException, UnknownError, InvalidConfiguration, Timeout ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \ ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \ CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \ - NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull + NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull, Interrupted diff --git a/src/client.cc b/src/client.cc index 206c4e2..0103309 100644 --- a/src/client.cc +++ b/src/client.cc @@ -24,73 +24,38 @@ namespace py = pybind11; Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) { - Producer producer; - - waitForAsyncValue(std::function<void(CreateProducerCallback)>([&](CreateProducerCallback callback) { - client.createProducerAsync(topic, conf, callback); - }), - producer); - - return producer; + return waitForAsyncValue<Producer>( + [&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); }); } Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf) { - Consumer consumer; - - waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) { - client.subscribeAsync(topic, subscriptionName, conf, callback); - }), - consumer); - - return consumer; + return waitForAsyncValue<Consumer>( + [&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); }); } Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf) { - Consumer consumer; - - waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) { - client.subscribeAsync(topics, subscriptionName, conf, callback); - }), - consumer); - - return consumer; + return waitForAsyncValue<Consumer>( + [&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); }); } Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, const ConsumerConfiguration& conf) { - Consumer consumer; - - waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) { - client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback); - }), - consumer); - - return consumer; + return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) { + client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback); + }); } Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf) { - Reader reader; - - waitForAsyncValue(std::function<void(ReaderCallback)>([&](ReaderCallback callback) { - client.createReaderAsync(topic, startMessageId, conf, callback); - }), - reader); - - return reader; + return waitForAsyncValue<Reader>( + [&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); }); } std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) { - std::vector<std::string> partitions; - - waitForAsyncValue(std::function<void(GetPartitionsCallback)>([&](GetPartitionsCallback callback) { - client.getPartitionsForTopicAsync(topic, callback); - }), - partitions); - - return partitions; + return waitForAsyncValue<std::vector<std::string>>( + [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); }); } void Client_close(Client& client) { diff --git a/src/consumer.cc b/src/consumer.cc index 972bd0b..4b44775 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -29,13 +29,7 @@ void Consumer_unsubscribe(Consumer& consumer) { } Message Consumer_receive(Consumer& consumer) { - Message msg; - - waitForAsyncValue(std::function<void(ReceiveCallback)>( - [&consumer](ReceiveCallback callback) { consumer.receiveAsync(callback); }), - msg); - - return msg; + return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); }); } Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { @@ -59,32 +53,27 @@ Messages Consumer_batch_receive(Consumer& consumer) { void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); } void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { - Py_BEGIN_ALLOW_THREADS - consumer.acknowledgeAsync(msgId, nullptr); + Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr); Py_END_ALLOW_THREADS } void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { - Py_BEGIN_ALLOW_THREADS - consumer.negativeAcknowledge(msg); + Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg); Py_END_ALLOW_THREADS } void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { - Py_BEGIN_ALLOW_THREADS - consumer.negativeAcknowledge(msgId); + Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId); Py_END_ALLOW_THREADS } void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) { - Py_BEGIN_ALLOW_THREADS - consumer.acknowledgeCumulativeAsync(msg, nullptr); + Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr); Py_END_ALLOW_THREADS } void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) { - Py_BEGIN_ALLOW_THREADS - consumer.acknowledgeCumulativeAsync(msgId, nullptr); + Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr); Py_END_ALLOW_THREADS } diff --git a/src/future.h b/src/future.h deleted file mode 100644 index 6754c89..0000000 --- a/src/future.h +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#ifndef LIB_FUTURE_H_ -#define LIB_FUTURE_H_ - -#include <functional> -#include <mutex> -#include <memory> -#include <condition_variable> - -#include <list> - -typedef std::unique_lock<std::mutex> Lock; - -namespace pulsar { - -template <typename Result, typename Type> -struct InternalState { - std::mutex mutex; - std::condition_variable condition; - Result result; - Type value; - bool complete; - - std::list<typename std::function<void(Result, const Type&)> > listeners; -}; - -template <typename Result, typename Type> -class Future { - public: - typedef std::function<void(Result, const Type&)> ListenerCallback; - - Future& addListener(ListenerCallback callback) { - InternalState<Result, Type>* state = state_.get(); - Lock lock(state->mutex); - - if (state->complete) { - lock.unlock(); - callback(state->result, state->value); - } else { - state->listeners.push_back(callback); - } - - return *this; - } - - Result get(Type& result) { - InternalState<Result, Type>* state = state_.get(); - Lock lock(state->mutex); - - if (!state->complete) { - // Wait for result - while (!state->complete) { - state->condition.wait(lock); - } - } - - result = state->value; - return state->result; - } - - template <typename Duration> - bool get(Result& res, Type& value, Duration d) { - InternalState<Result, Type>* state = state_.get(); - Lock lock(state->mutex); - - if (!state->complete) { - // Wait for result - while (!state->complete) { - if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) { - // Timeout while waiting for the future to complete - return false; - } - } - } - - value = state->value; - res = state->result; - return true; - } - - private: - typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr; - Future(InternalStatePtr state) : state_(state) {} - - std::shared_ptr<InternalState<Result, Type> > state_; - - template <typename U, typename V> - friend class Promise; -}; - -template <typename Result, typename Type> -class Promise { - public: - Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {} - - bool setValue(const Type& value) const { - static Result DEFAULT_RESULT; - InternalState<Result, Type>* state = state_.get(); - Lock lock(state->mutex); - - if (state->complete) { - return false; - } - - state->value = value; - state->result = DEFAULT_RESULT; - state->complete = true; - - decltype(state->listeners) listeners; - listeners.swap(state->listeners); - - lock.unlock(); - - for (auto& callback : listeners) { - callback(DEFAULT_RESULT, value); - } - - state->condition.notify_all(); - return true; - } - - bool setFailed(Result result) const { - static Type DEFAULT_VALUE; - InternalState<Result, Type>* state = state_.get(); - Lock lock(state->mutex); - - if (state->complete) { - return false; - } - - state->result = result; - state->complete = true; - - decltype(state->listeners) listeners; - listeners.swap(state->listeners); - - lock.unlock(); - - for (auto& callback : listeners) { - callback(result, DEFAULT_VALUE); - } - - state->condition.notify_all(); - return true; - } - - bool isComplete() const { - InternalState<Result, Type>* state = state_.get(); - Lock lock(state->mutex); - return state->complete; - } - - Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); } - - private: - typedef std::function<void(Result, const Type&)> ListenerCallback; - std::shared_ptr<InternalState<Result, Type> > state_; -}; - -class Void {}; - -} /* namespace pulsar */ - -#endif /* LIB_FUTURE_H_ */ diff --git a/src/producer.cc b/src/producer.cc index 1dd5a76..7027185 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -25,21 +25,15 @@ namespace py = pybind11; MessageId Producer_send(Producer& producer, const Message& message) { - MessageId messageId; - - waitForAsyncValue(std::function<void(SendCallback)>( - [&](SendCallback callback) { producer.sendAsync(message, callback); }), - messageId); - - return messageId; + return waitForAsyncValue<MessageId>( + [&](SendCallback callback) { producer.sendAsync(message, callback); }); } void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) { - Py_BEGIN_ALLOW_THREADS - producer.sendAsync(msg, callback); + Py_BEGIN_ALLOW_THREADS producer.sendAsync(msg, callback); Py_END_ALLOW_THREADS - if (PyErr_CheckSignals() == -1) { + if (PyErr_CheckSignals() == -1) { PyErr_SetInterrupt(); } } diff --git a/src/reader.cc b/src/reader.cc index 7194c29..0126f3f 100644 --- a/src/reader.cc +++ b/src/reader.cc @@ -62,14 +62,8 @@ Message Reader_readNextTimeout(Reader& reader, int timeoutMs) { } bool Reader_hasMessageAvailable(Reader& reader) { - bool available = false; - - waitForAsyncValue( - std::function<void(HasMessageAvailableCallback)>( - [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }), - available); - - return available; + return waitForAsyncValue<bool>( + [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }); } void Reader_close(Reader& reader) { diff --git a/src/utils.cc b/src/utils.cc index cf8f6f4..8ebc3f9 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -20,28 +20,29 @@ #include "utils.h" void waitForAsyncResult(std::function<void(ResultCallback)> func) { - Result res = ResultOk; - bool b; - Promise<bool, Result> promise; - Future<bool, Result> future = promise.getFuture(); + auto promise = std::make_shared<std::promise<Result>>(); + func([promise](Result result) { promise->set_value(result); }); + internal::waitForResult(*promise); +} - Py_BEGIN_ALLOW_THREADS func(WaitForCallback(promise)); - Py_END_ALLOW_THREADS +namespace internal { - bool isComplete; +void waitForResult(std::promise<pulsar::Result>& promise) { + auto future = promise.get_future(); while (true) { - // Check periodically for Python signals - Py_BEGIN_ALLOW_THREADS isComplete = future.get(b, std::ref(res), std::chrono::milliseconds(100)); - Py_END_ALLOW_THREADS - - if (isComplete) { - CHECK_RESULT(res); - return; + { + py::gil_scoped_release release; + auto status = future.wait_for(std::chrono::milliseconds(100)); + if (status == std::future_status::ready) { + CHECK_RESULT(future.get()); + return; + } } - - if (PyErr_CheckSignals() == -1) { - PyErr_SetInterrupt(); - return; + py::gil_scoped_acquire acquire; + if (PyErr_CheckSignals() != 0) { + raiseException(ResultInterrupted); } } } + +} // namespace internal diff --git a/src/utils.h b/src/utils.h index fb700c6..bbe202e 100644 --- a/src/utils.h +++ b/src/utils.h @@ -21,12 +21,14 @@ #include <pulsar/Client.h> #include <pulsar/MessageBatch.h> +#include <chrono> #include <exception> -#include <Python.h> +#include <future> +#include <pybind11/pybind11.h> #include "exceptions.h" -#include "future.h" using namespace pulsar; +namespace py = pybind11; inline void CHECK_RESULT(Result res) { if (res != ResultOk) { @@ -34,56 +36,26 @@ inline void CHECK_RESULT(Result res) { } } -struct WaitForCallback { - Promise<bool, Result> m_promise; +namespace internal { - WaitForCallback(Promise<bool, Result> promise) : m_promise(promise) {} +void waitForResult(std::promise<pulsar::Result>& promise); - void operator()(Result result) { m_promise.setValue(result); } -}; - -template <typename T> -struct WaitForCallbackValue { - Promise<Result, T>& m_promise; - - WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {} - - void operator()(Result result, const T& value) { - if (result == ResultOk) { - m_promise.setValue(value); - } else { - m_promise.setFailed(result); - } - } -}; +} // namespace internal void waitForAsyncResult(std::function<void(ResultCallback)> func); -template <typename T, typename Callback> -inline void waitForAsyncValue(std::function<void(Callback)> func, T& value) { - Result res = ResultOk; - Promise<Result, T> promise; - Future<Result, T> future = promise.getFuture(); - - Py_BEGIN_ALLOW_THREADS func(WaitForCallbackValue<T>(promise)); - Py_END_ALLOW_THREADS +template <typename T> +inline T waitForAsyncValue(std::function<void(std::function<void(Result, const T&)>)> func) { + auto resultPromise = std::make_shared<std::promise<Result>>(); + auto valuePromise = std::make_shared<std::promise<T>>(); - bool isComplete; - while (true) { - // Check periodically for Python signals - Py_BEGIN_ALLOW_THREADS isComplete = future.get(res, std::ref(value), std::chrono::milliseconds(100)); - Py_END_ALLOW_THREADS + func([resultPromise, valuePromise](Result result, const T& value) { + valuePromise->set_value(value); + resultPromise->set_value(result); + }); - if (isComplete) { - CHECK_RESULT(res); - return; - } - - if (PyErr_CheckSignals() == -1) { - PyErr_SetInterrupt(); - return; - } - } + internal::waitForResult(*resultPromise); + return valuePromise->get_future().get(); } struct CryptoKeyReaderWrapper { diff --git a/examples/consumer.py b/tests/interrupted_test.py old mode 100755 new mode 100644 similarity index 50% copy from examples/consumer.py copy to tests/interrupted_test.py index 8c2985e..6d61f99 --- a/examples/consumer.py +++ b/tests/interrupted_test.py @@ -18,19 +18,34 @@ # under the License. # - +from unittest import TestCase, main import pulsar +import signal +import time +import threading + +class InterruptedTest(TestCase): + + service_url = 'pulsar://localhost:6650' -client = pulsar.Client('pulsar://localhost:6650') -consumer = client.subscribe('my-topic', "my-subscription", - properties={ - "consumer-name": "test-consumer-name", - "consumer-id": "test-consumer-id" - }) + def test_sigint(self): + def thread_function(): + time.sleep(1) + signal.raise_signal(signal.SIGINT) -while True: - msg = consumer.receive() - print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id())) - consumer.acknowledge(msg) + client = pulsar.Client(self.service_url) + consumer = client.subscribe('test-sigint', "my-sub") + thread = threading.Thread(target=thread_function) + thread.start() + + start = time.time() + with self.assertRaises(pulsar.Interrupted): + consumer.receive() + finish = time.time() + print(f"time: {finish - start}") + self.assertGreater(finish - start, 1) + self.assertLess(finish - start, 1.5) + client.close() -client.close() +if __name__ == '__main__': + main() diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh index 13349f9..5168f94 100755 --- a/tests/run-unit-tests.sh +++ b/tests/run-unit-tests.sh @@ -24,4 +24,5 @@ ROOT_DIR=$(git rev-parse --show-toplevel) cd $ROOT_DIR/tests python3 custom_logger_test.py +python3 interrupted_test.py python3 pulsar_test.py