BewareMyPower commented on code in PR #16969:
URL: https://github.com/apache/pulsar/pull/16969#discussion_r952165693
##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -363,13 +401,12 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback
callback) {
auto self = shared_from_this();
int numConsumers = 0;
- consumers_.forEach(
- [&numConsumers, &self, callback](const std::string& name, const
ConsumerImplPtr& consumer) {
- numConsumers++;
- consumer->closeAsync([self, name, callback](Result result) {
- self->handleSingleConsumerClose(result, name, callback);
- });
+ for (const auto& item : consumers_.toPairVector()) {
Review Comment:
I got your point now. However, your test is not exactly the same case. A C++
container (`vector`, `map`, etc.) should not be modified in a range for loop.
For example,
```c+
std::map<int, int> m;
m[1] = 0;
m[2] = 0;
for (const auto& kv : m) {
m.erase(kv.first);
}
```
It's because a range for loop just iterates over the underlying RB-tree (or
hash table against a `std::unordered_map`). Modifying it can make the next
iterator invalid.
----
However, the case here is different that you call `map.remove` directly in
the range for loop, while the `handleSingleConsumerClose` is a callback of
`closeAsync` that is called in another thread. For example, here is a test I
added for the close of multi-topics consumer (still use `forEach`, and change
the log of `handleSingleConsumerClose` to info):
```c++
TEST(ConsumerTest, testMultiTopicsConsumerClose) {
Client client(lookupUrl);
Consumer consumer;
client.subscribe({"tp1", "tp2", "tp3"}, "sub", consumer);
auto send = [&client](const std::string& topic) {
Producer producer;
client.createProducer(topic, producer);
producer.send(MessageBuilder().setContent("hello").build());
};
send("tp1");
send("tp2");
send("tp3");
while (true) {
Message msg;
auto result = consumer.receive(msg, 1000);
if (result != ResultOk) {
break;
}
LOG_INFO("Received " << msg.getDataAsString() << " from " <<
msg.getTopicName() << " "
<< msg.getMessageId());
}
auto result = consumer.close();
LOG_INFO("Consumer close: " << result);
}
```
The outputs are:
```
2022-08-23 13:22:10.567 INFO [0x11d299600] ConsumerImpl:991 |
[persistent://public/default/tp3, sub, 2] Closing consumer for topic
persistent://public/default/tp3
2022-08-23 13:22:10.567 INFO [0x11d299600] ConsumerImpl:991 |
[persistent://public/default/tp2, sub, 1] Closing consumer for topic
persistent://public/default/tp2
2022-08-23 13:22:10.567 INFO [0x11d299600] ConsumerImpl:991 |
[persistent://public/default/tp1, sub, 0] Closing consumer for topic
persistent://public/default/tp1
2022-08-23 13:22:10.572 INFO [0x70000d14e000] ConsumerImpl:1042 |
[persistent://public/default/tp3, sub, 2] Closed consumer 2
2022-08-23 13:22:10.572 INFO [0x70000d14e000] MultiTopicsConsumerImpl:395 |
Closing the consumer for partition - persistent://public/default/tp3
numberTopicPartitions_ - 3
2022-08-23 13:22:10.572 INFO [0x70000d14e000] ConsumerImpl:1042 |
[persistent://public/default/tp2, sub, 1] Closed consumer 1
2022-08-23 13:22:10.572 INFO [0x70000d14e000] MultiTopicsConsumerImpl:395 |
Closing the consumer for partition - persistent://public/default/tp2
numberTopicPartitions_ - 2
2022-08-23 13:22:10.572 INFO [0x70000d14e000] ConsumerImpl:1042 |
[persistent://public/default/tp1, sub, 0] Closed consumer 0
2022-08-23 13:22:10.572 INFO [0x70000d14e000] MultiTopicsConsumerImpl:395 |
Closing the consumer for partition - persistent://public/default/tp1
numberTopicPartitions_ - 1
```
The id of the thread that calls `closeAsync` (which calls `forEach` on a
`SynchronizedHashMap`) is 0x11d299600, while the id of the thread that calls
`handleSingleConsumerClose` (which calls `remove` or `clear` on a
`SynchronizedHashMap`) is 0x70000d14e000.
**BTW, I found there is still a chance that `handleSingleConsumerClose` is
called directly in the current thread. I think it's a bug that needs to be
fixed. I might open another PR to fix it.**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]