erobot opened a new pull request, #185:
URL: https://github.com/apache/pulsar-client-cpp/pull/185

   ### Motivation
   
   Avoid resource leakage of AckGroupingTracker.
   
   The result of AckGroupingTracker leakage is waste of cpu. In our case, after 
a large number (30w+) of consumer creation failures, pulsar client io threads 
use 99% cpu when no operations.
   
   There are two problems about the leakage:
   * In the current code, if consumer creation failed, 
ConsumerImpl::ackGroupingTrackerPtr_ will not close.
   * AckGroupingTrackerEnabled has race condition between close and reschedule, 
and may continue reschedule after close.
   
   ### Modifications
   
   * ConsumerImpl: close ackGroupingTrackerPtr_ when shutdown
   * AckGroupingTrackerEnabled: add state, and check state before reschedule
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   Can verify this change by adding a log and using a test program. The leakage 
resources are hold in boost::asio::io_service and seems not easy to write a 
unit test to verify.
   
   The test program trys to trigger AckGroupingTracker leakage. If there is a 
leak, will print a lot of test log 'reschedule AckGroupingTrackerEnabled'. 
   
   Add a test log:
   ```
   void AckGroupingTrackerEnabled::scheduleTimer() {
       // ......
       this->timer_->async_wait([this, self](const boost::system::error_code& 
ec) -> void {
           if (!ec) {
               this->flush();
               this->scheduleTimer();
               LOG_INFO("reschedule AckGroupingTrackerEnabled");  // add a log 
when reschedule
           }
       });
   }
   ```
   
   Test program:
   ```
   int main() {
       Client client("pulsar://localhost:6650");
   
       Consumer consumer;
       ConsumerConfiguration config;
       config.setConsumerType(ConsumerType::ConsumerExclusive);
       config.setAckGroupingTimeMs(1);
   
       // create exclusive consumer
       Result result = client.subscribe("persistent://public/default/my-topic", 
"consumer-1", config, consumer);
       if (result != ResultOk) {
           LOG_ERROR("Failed to subscribe: " << result);
           return -1;
       }
   
       // create other consumer, will fail
       for (int i = 0; i < 1000; ++i) {
           Result result =
               client.subscribe("persistent://public/default/my-topic", 
"consumer-1", config, consumer);
           assert(result != ResultOk);
           (void)result;
       }
   
       consumer.close();
   
       LOG_INFO("sleep, should not have reschedule logs below");
       std::this_thread::sleep_for(std::chrono::seconds(60));
   
       client.close();
       return 0;
   }
   ```
   
   ### Documentation
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   Bug fix only.
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to