This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
View the commit online: https://github.com/apache/pulsar/commit/fa029700d3edbc398356f6b74cad1120649d2494 The following commit(s) were added to refs/heads/master by this push: new fa02970 Fix consumer of python queue size is zero (#5706) fa02970 is described below commit fa029700d3edbc398356f6b74cad1120649d2494 Author: tuteng <[email protected]> AuthorDate: Thu Nov 21 14:41:39 2019 +0800 Fix consumer of python queue size is zero (#5706) Fixes https://github.com/apache/pulsar/issues/5634 Master Issue: https://github.com/apache/pulsar/issues/5634 ### Motivation In java clients, when we call the `receive`, we will block it until a message arrives. in python clients, when we call the `receive` function, we add a delay parameter of 100ms. when the queue size is 0, the `receive` will have a strict check on the queue size, causing the following exception to be thrown ``` Traceback (most recent call last): File "tst.py", line 10, in <module> msg = consumer.receive() File "/python3.7/site-packages/pulsar/__init__.py", line 930, in receive msg = self._consumer.receive() Exception: Pulsar error: InvalidConfiguration ``` ### Modifications * Removing timeout parameter in synchronous `receive` * Add test for queue size is 0 ### Verifying this change Add Test --- pulsar-client-cpp/python/pulsar_test.py | 17 +++++++++++++++++ pulsar-client-cpp/python/src/consumer.cc | 4 +--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index f31a7e1..3f8bc08 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -174,6 +174,23 @@ class PulsarTest(TestCase): consumer.unsubscribe() client.close() + def test_consumer_queue_size_is_zero(self): + client = Client(self.serviceUrl) + consumer = client.subscribe('my-python-topic-consumer-init-queue-size-is-zero', + 'my-sub', + consumer_type=ConsumerType.Shared, + receiver_queue_size=0, + initial_position=InitialPosition.Earliest) + producer = client.create_producer('my-python-topic-consumer-init-queue-size-is-zero') + producer.send(b'hello') + time.sleep(0.1) + msg = consumer.receive() + self.assertTrue(msg) + self.assertEqual(msg.data(), b'hello') + + consumer.unsubscribe() + client.close() + def test_message_properties(self): client = Client(self.serviceUrl) topic = 'my-python-test-message-properties' diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc index 930b159..815282d 100644 --- a/pulsar-client-cpp/python/src/consumer.cc +++ b/pulsar-client-cpp/python/src/consumer.cc @@ -33,9 +33,7 @@ Message Consumer_receive(Consumer& consumer) { while (true) { Py_BEGIN_ALLOW_THREADS - // Use 100ms timeout to periodically check whether the - // interpreter was interrupted - res = consumer.receive(msg, 100); + res = consumer.receive(msg); Py_END_ALLOW_THREADS if (res != ResultTimeout) {
