BewareMyPower edited a comment on issue #7851:
URL: https://github.com/apache/pulsar/issues/7851#issuecomment-677716498
Though make `clear()` thread-safe may be better, **I'm not sure if the crash
is caused by this**.
First, make a correction that `clear()` is called only when the connection
is established:
```c++
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
/* ... */
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();
/* ... */
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_, subscriptionMode_,
startMessageId_, readCompacted_, config_.getProperties(),
config_.getSchema(), getInitialPosition());
// trigger the `handleCreateConsumer()` method after receiving response
from broker
cnx->sendRequestWithId(cmd, requestId)
.addListener(
std::bind(&ConsumerImpl::handleCreateConsumer,
shared_from_this(), cnx, std::placeholders::_1));
}
```
The future is completed in `handleCreateConsumer`, which is after
`connectionOpened`:
```c++
void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx,
Result result) {
static bool firstTime = true;
if (result == ResultOk) {
/* ... */
consumerCreatedPromise_.setValue(shared_from_this()); //
getConsumerCreatedFuture() is completed
} else {
```
Then `ClientImpl::handleConsumerCreated` will be called, see:
```c++
void ClientImpl::handleSubscribe(const Result result, const
LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, const std::string&
consumerName,
ConsumerConfiguration conf,
SubscribeCallback callback) {
if (result == ResultOk) {
/* ... */
// `handleConsumerCreated` is called when getConsumerCreatedFuture()
is completed
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
Lock lock(mutex_);
consumers_.push_back(consumer);
lock.unlock();
consumer->start();
} else {
```
```c++
void ClientImpl::handleConsumerCreated(Result result,
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback,
ConsumerImplBasePtr consumer) {
callback(result, Consumer(consumer)); // Finally, the consumer argument
of `Client::subscribe` is set to this consumer
}
```
Therefore, the consumer could only call `acknowledge` or `receive` after
`subscribe` returned, the race condition couldn't happen even if `clear()` of
trackers is not thread-safe.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]