BewareMyPower commented on code in PR #228:
URL:
https://github.com/apache/pulsar-client-python/pull/228#discussion_r1867009999
##########
src/client.cc:
##########
@@ -41,19 +42,38 @@ Consumer Client_subscribe(Client& client, const
std::string& topic, const std::s
[&](SubscribeCallback callback) { client.subscribeAsync(topic,
subscriptionName, conf, callback); });
}
+void Client_subscribeAsync(Client& client, const std::string& topic, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf, SubscribeCallback
callback) {
+ py::gil_scoped_release release;
+ client.subscribeAsync(topic, subscriptionName, conf, callback);
+}
+
Consumer Client_subscribe_topics(Client& client, const
std::vector<std::string>& topics,
const std::string& subscriptionName, const
ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>(
[&](SubscribeCallback callback) { client.subscribeAsync(topics,
subscriptionName, conf, callback); });
}
+void Client_subscribe_topicsAsync(Client& client, const
std::vector<std::string>& topics, const std::string& subscriptionName, const
ConsumerConfiguration& conf, SubscribeCallback callback){
+ client.subscribeAsync(topics, subscriptionName, conf, [callback](Result
result, pulsar::Consumer consumer){
+ callback(result, consumer);
+ });
+}
+
Consumer Client_subscribe_pattern(Client& client, const std::string&
topic_pattern,
const std::string& subscriptionName, const
ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf,
callback);
});
}
+void Client_subscribe_patternAsync(Client& client, const std::string&
topic_pattern, const std::string& subscriptionName, const
ConsumerConfiguration& conf, SubscribeCallback callback){
+ client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf,
[callback](Result result, Consumer consumer){
+ py::gil_scoped_acquire acquire;
+ callback(result, consumer);
Review Comment:
You added three `subscribe` related async methods. However, their
implementations are not consistent.
- `Client_subscribeAsync`: releases the GIL first, and then call the
pulsar-client-cpp API directly
- `Client_subscribe_topicsAsync`: call the pulsar-client-cpp API and wrap
the callback again
- `Client_subscribe_patternAsync`: call the pulsar-client-cpp API and wrap
the callback with acquiring the GIL before executing the callback
IMO, the 1st way is correct. You can also take `Producer_sendAsync` as
example. Though it does not use `py::gil_scoped_acquire` directly.
##########
tests/asyncio_test.py:
##########
@@ -19,17 +19,24 @@
#
import asyncio
+from typing import Iterable
+
+from _pulsar import ConsumerType
+
import pulsar
from pulsar.asyncio import (
Client,
PulsarException,
+ Consumer
)
from unittest import (
main,
IsolatedAsyncioTestCase,
)
-service_url = 'pulsar://localhost:6650'
+# TODO: Write tests for everything else
+
Review Comment:
What does the TODO here mean?
##########
src/consumer.cc:
##########
@@ -4,7 +4,7 @@
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
Review Comment:
This change is still not reverted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]