ivankelly commented on a change in pull request #11570:
URL: https://github.com/apache/pulsar/pull/11570#discussion_r684284774
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -87,13 +87,18 @@ unsigned int
PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}
-ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int
partition) const {
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int
partition) {
using namespace std::placeholders;
std::string topicPartitionName =
topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_,
topicPartitionName, conf_, partition);
- producer->getProducerCreatedFuture().addListener(
-
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
-
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2,
partition));
+
+ if (conf_.getLazyStartPartitionedProducers()) {
Review comment:
What I was suggesting was to kick off the update task in ::start() if
lazyproducer is enabled. Is numProducersCreated used for anything else?
##########
File path: pulsar-client-cpp/lib/ProducerImpl.cc
##########
@@ -550,10 +559,14 @@ void ProducerImpl::batchMessageTimeoutHandler(const
boost::system::error_code& e
return;
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
- Lock lock(mutex_);
- auto failures = batchMessageAndSend();
- lock.unlock();
- failures.complete();
+
+ // ignore if the producer is already closing/closed
+ if (assertState(Ready)) {
+ Lock lock(mutex_);
Review comment:
I see that assertState take the lock, but there's a window between that
returning and it being taken again where the producer could be closed.
##########
File path: pulsar-client-cpp/lib/ProducerImpl.cc
##########
@@ -550,10 +559,14 @@ void ProducerImpl::batchMessageTimeoutHandler(const
boost::system::error_code& e
return;
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
- Lock lock(mutex_);
- auto failures = batchMessageAndSend();
- lock.unlock();
- failures.complete();
+
+ // ignore if the producer is already closing/closed
+ if (assertState(Ready)) {
+ Lock lock(mutex_);
Review comment:
should the assertState not happen inside the lock?
##########
File path: pulsar-client-cpp/lib/PartitionedProducerImpl.cc
##########
@@ -379,8 +418,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
- for (const auto& producer : producers_) {
- if (!producer->isConnected()) {
+ for (const auto& producer : producers) {
Review comment:
Ya, I have no strong opinion here. "connected" is a weird concept in a
distributed system like this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]