ivankelly commented on a change in pull request #11570:
URL: https://github.com/apache/pulsar/pull/11570#discussion_r683470062
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -209,7 +236,10 @@ void PartitionedProducerImpl::closeAsync(CloseCallback
closeCallback) {
// Here we don't need `producersMutex` to protect `producers_`, because
`producers_` can only be increased
// when `state_` is Ready
for (auto& producer : producers_) {
- if (!producer->isClosed()) {
+ if (!producer->isStarted()) {
Review comment:
what happens if a client does a sendAsync concurrently with closeAsync?
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -161,7 +182,13 @@ void PartitionedProducerImpl::sendAsync(const Message&
msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];
+
+ if (conf_.getLazyStartPartitionedProducers() && !producer->isStarted()) {
+ producer->start();
Review comment:
is producer::start() idempotent?
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -379,8 +418,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
- for (const auto& producer : producers_) {
- if (!producer->isConnected()) {
+ for (const auto& producer : producers) {
Review comment:
If the producer has been created, but no messages have been sent, this
will return true, even though there has been no connections.
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -161,7 +182,13 @@ void PartitionedProducerImpl::sendAsync(const Message&
msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];
+
+ if (conf_.getLazyStartPartitionedProducers() && !producer->isStarted()) {
Review comment:
is checking the conf needed here?
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -87,13 +87,18 @@ unsigned int
PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}
-ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int
partition) const {
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int
partition) {
using namespace std::placeholders;
std::string topicPartitionName =
topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_,
topicPartitionName, conf_, partition);
- producer->getProducerCreatedFuture().addListener(
-
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
-
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2,
partition));
+
+ if (conf_.getLazyStartPartitionedProducers()) {
Review comment:
This looks like a noop. Rather than having "createLazyPartitionProducer"
count the number of producers created and change state when all done, you could
change ::start() to change the state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]