erobot opened a new pull request, #185:
URL: https://github.com/apache/pulsar-client-cpp/pull/185
### Motivation
Avoid resource leakage of AckGroupingTracker.
The result of AckGroupingTracker leakage is waste of cpu. In our case, after
a large number (30w+) of consumer creation failures, pulsar client io threads
use 99% cpu when no operations.
There are two problems about the leakage:
* In the current code, if consumer creation failed,
ConsumerImpl::ackGroupingTrackerPtr_ will not close.
* AckGroupingTrackerEnabled has race condition between close and reschedule,
and may continue reschedule after close.
### Modifications
* ConsumerImpl: close ackGroupingTrackerPtr_ when shutdown
* AckGroupingTrackerEnabled: add state, and check state before reschedule
### Verifying this change
- [x] Make sure that the change passes the CI checks.
Can verify this change by adding a log and using a test program. The leakage
resources are hold in boost::asio::io_service and seems not easy to write a
unit test to verify.
The test program trys to trigger AckGroupingTracker leakage. If there is a
leak, will print a lot of test log 'reschedule AckGroupingTrackerEnabled'.
Add a test log:
```
void AckGroupingTrackerEnabled::scheduleTimer() {
// ......
this->timer_->async_wait([this, self](const boost::system::error_code&
ec) -> void {
if (!ec) {
this->flush();
this->scheduleTimer();
LOG_INFO("reschedule AckGroupingTrackerEnabled"); // add a log
when reschedule
}
});
}
```
Test program:
```
int main() {
Client client("pulsar://localhost:6650");
Consumer consumer;
ConsumerConfiguration config;
config.setConsumerType(ConsumerType::ConsumerExclusive);
config.setAckGroupingTimeMs(1);
// create exclusive consumer
Result result = client.subscribe("persistent://public/default/my-topic",
"consumer-1", config, consumer);
if (result != ResultOk) {
LOG_ERROR("Failed to subscribe: " << result);
return -1;
}
// create other consumer, will fail
for (int i = 0; i < 1000; ++i) {
Result result =
client.subscribe("persistent://public/default/my-topic",
"consumer-1", config, consumer);
assert(result != ResultOk);
(void)result;
}
consumer.close();
LOG_INFO("sleep, should not have reschedule logs below");
std::this_thread::sleep_for(std::chrono::seconds(60));
client.close();
return 0;
}
```
### Documentation
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
Bug fix only.
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]