BewareMyPower commented on code in PR #16969:
URL: https://github.com/apache/pulsar/pull/16969#discussion_r952165693


##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -363,13 +401,12 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
 
     auto self = shared_from_this();
     int numConsumers = 0;
-    consumers_.forEach(
-        [&numConsumers, &self, callback](const std::string& name, const 
ConsumerImplPtr& consumer) {
-            numConsumers++;
-            consumer->closeAsync([self, name, callback](Result result) {
-                self->handleSingleConsumerClose(result, name, callback);
-            });
+    for (const auto& item : consumers_.toPairVector()) {

Review Comment:
   I got your point now. However, your test is not exactly the same case. A C++ 
container (`vector`, `map`, etc.) should not be modified in a range for loop. 
For example,
   
   ```c+
       std::map<int, int> m;
       m[1] = 0;
       m[2] = 0;
       for (const auto& kv : m) {
           m.erase(kv.first);
       }
   ```
   
   It's because a range for loop just iterates over the underlying RB-tree (or 
hash table against a `std::unordered_map`). Modifying it can make the next 
iterator invalid.
   
   ----
   
   However, the case here is different that you call `map.remove` directly in 
the range for loop, while the `handleSingleConsumerClose` is a callback of 
`closeAsync` that is called in another thread. For example, here is a test I 
added for the close of multi-topics consumer (still use `forEach`, and change 
the log of `handleSingleConsumerClose` to info):
   
   ```c++
   TEST(ConsumerTest, testMultiTopicsConsumerClose) {
       Client client(lookupUrl);
   
       Consumer consumer;
       client.subscribe({"tp1", "tp2", "tp3"}, "sub", consumer);
   
       auto send = [&client](const std::string& topic) {
           Producer producer;
           client.createProducer(topic, producer);
           producer.send(MessageBuilder().setContent("hello").build());
       };
       send("tp1");
       send("tp2");
       send("tp3");
   
       while (true) {
           Message msg;
           auto result = consumer.receive(msg, 1000);
           if (result != ResultOk) {
               break;
           }
           LOG_INFO("Received " << msg.getDataAsString() << " from " << 
msg.getTopicName() << " "
                                << msg.getMessageId());
       }
   
       auto result = consumer.close();
       LOG_INFO("Consumer close: " << result);
   }
   ```
   
   The outputs are:
   
   ```
   2022-08-23 13:22:10.567 INFO  [0x11d299600] ConsumerImpl:991 | 
[persistent://public/default/tp3, sub, 2] Closing consumer for topic 
persistent://public/default/tp3
   2022-08-23 13:22:10.567 INFO  [0x11d299600] ConsumerImpl:991 | 
[persistent://public/default/tp2, sub, 1] Closing consumer for topic 
persistent://public/default/tp2
   2022-08-23 13:22:10.567 INFO  [0x11d299600] ConsumerImpl:991 | 
[persistent://public/default/tp1, sub, 0] Closing consumer for topic 
persistent://public/default/tp1
   2022-08-23 13:22:10.572 INFO  [0x70000d14e000] ConsumerImpl:1042 | 
[persistent://public/default/tp3, sub, 2] Closed consumer 2
   2022-08-23 13:22:10.572 INFO  [0x70000d14e000] MultiTopicsConsumerImpl:395 | 
Closing the consumer for partition - persistent://public/default/tp3 
numberTopicPartitions_ - 3
   2022-08-23 13:22:10.572 INFO  [0x70000d14e000] ConsumerImpl:1042 | 
[persistent://public/default/tp2, sub, 1] Closed consumer 1
   2022-08-23 13:22:10.572 INFO  [0x70000d14e000] MultiTopicsConsumerImpl:395 | 
Closing the consumer for partition - persistent://public/default/tp2 
numberTopicPartitions_ - 2
   2022-08-23 13:22:10.572 INFO  [0x70000d14e000] ConsumerImpl:1042 | 
[persistent://public/default/tp1, sub, 0] Closed consumer 0
   2022-08-23 13:22:10.572 INFO  [0x70000d14e000] MultiTopicsConsumerImpl:395 | 
Closing the consumer for partition - persistent://public/default/tp1 
numberTopicPartitions_ - 1
   ```
   
   The id of the thread that calls `closeAsync` (which calls `forEach` on a 
`SynchronizedHashMap`) is 0x11d299600, while the id of the thread that calls 
`handleSingleConsumerClose` (which calls `remove` or `clear` on a 
`SynchronizedHashMap`) is 0x70000d14e000.
   
   **BTW, I found there is still a chance that `handleSingleConsumerClose` is 
called directly in the current thread. I think it's a bug that needs to be 
fixed. I might open another PR to fix it.**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to