This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new c970b3a [C++] Fix race condition in BlockingQueue (#8765)
c970b3a is described below
commit c970b3ac22aa10399e65858e17655fc895143b35
Author: TT <[email protected]>
AuthorDate: Tue Dec 8 17:42:57 2020 +0800
[C++] Fix race condition in BlockingQueue (#8765)
### Motivation
BlockingQueue has race condition that can cause threads waiting forever in
multithreading environment. ProducerImpl uses BlockingQueue as
pendingMessagesQueue_ and can be blocked forever at it. This PR fixes race
condition in BlockingQueue.
#### Race condition details
https://github.com/apache/pulsar/blob/91e2f832178d9ffd5d78161145d895910296c2d9/pulsar-client-cpp/lib/BlockingQueue.h#L172-L185
Use BlockingQueue::Pop as example, its procedure is:
1. lock
2. check wasFull and then change queue state
3. unlock
4. if wasFull, notify one thread waiting at queueFullCondition
Race condition sequence:
1. queue is full and there are multiple threads waiting on
queueFullCondition
2. thread A call Pop, lock, wasFull is true, unlock -> queue has one free
space
3. thread B call Pop, lock, wasFull is false, unlock -> queue has two free
spaces
4. thread A notify one thread waiting at queueFullCondition
5. queue is no loger full again
6. result: except one thread is notified by A, other threads waiting on
queueFullCondition are waiting forever
### Modifications
* Use notify_all instead of notify_one to notify threads waiting on
condition variables
Reason: Currently only notify threads when queue is full or empty. After
unlock, other threads may change queue state, so thread to notify condition can
not determine how queue state changed and should use notify_all in case of more
then one change occured.
### Verifying this change
- Add a test case BlockingQueueTest.testPushPopRace to test concurrent
push and pop
(cherry picked from commit 18b38766fbcf3d5944824e828566edca310ee9d8)
---
pulsar-client-cpp/lib/BlockingQueue.h | 12 ++++-----
pulsar-client-cpp/tests/BlockingQueueTest.cc | 39 ++++++++++++++++++++++++++++
2 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/pulsar-client-cpp/lib/BlockingQueue.h
b/pulsar-client-cpp/lib/BlockingQueue.h
index 5e466bd..2814c4f 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -126,7 +126,7 @@ class BlockingQueue {
lock.unlock();
if (wasEmpty) {
// Notify that an element is pushed
- queueEmptyCondition.notify_one();
+ queueEmptyCondition.notify_all();
}
}
@@ -145,7 +145,7 @@ class BlockingQueue {
if (wasEmpty) {
// Notify that an element is pushed
- queueEmptyCondition.notify_one();
+ queueEmptyCondition.notify_all();
}
}
@@ -163,7 +163,7 @@ class BlockingQueue {
if (wasEmpty) {
// Notify that an element is pushed
- queueEmptyCondition.notify_one();
+ queueEmptyCondition.notify_all();
}
return true;
@@ -180,7 +180,7 @@ class BlockingQueue {
if (wasFull) {
// Notify that an element is popped
- queueFullCondition.notify_one();
+ queueFullCondition.notify_all();
}
}
@@ -196,7 +196,7 @@ class BlockingQueue {
if (wasFull) {
// Notify that an element is popped
- queueFullCondition.notify_one();
+ queueFullCondition.notify_all();
}
}
@@ -274,7 +274,7 @@ class BlockingQueue {
if (wasFull) {
// Notify that one spot is now available
- queueFullCondition.notify_one();
+ queueFullCondition.notify_all();
}
}
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc
b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 4047e5e..42644e9 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -19,6 +19,8 @@
#include <gtest/gtest.h>
#include <lib/BlockingQueue.h>
+#include <future>
+#include <iostream>
#include <thread>
class ProducerWorker {
@@ -215,3 +217,40 @@ TEST(BlockingQueueTest, testReservedSpot) {
ASSERT_EQ(0, queue.size());
}
}
+
+TEST(BlockingQueueTest, testPushPopRace) {
+ auto test_logic = []() {
+ size_t size = 5;
+ BlockingQueue<int> queue(size);
+
+ std::vector<std::unique_ptr<ProducerWorker>> producers;
+ for (int i = 0; i < 5; ++i) {
+ producers.emplace_back(new ProducerWorker{queue});
+ producers.back()->produce(1000);
+ }
+
+ // wait for queue full
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ std::vector<std::unique_ptr<ConsumerWorker>> consumers;
+ for (int i = 0; i < 5; ++i) {
+ consumers.emplace_back(new ConsumerWorker{queue});
+ consumers.back()->consume(1000);
+ }
+
+ auto future = std::async(std::launch::async, [&]() {
+ for (auto& p : producers) p->join();
+ for (auto& c : consumers) c->join();
+ });
+ auto ret = future.wait_for(std::chrono::seconds(5));
+ if (ret == std::future_status::ready) {
+ std::cerr << "Exiting";
+ exit(0);
+ } else {
+ std::cerr << "Threads are not exited in time";
+ exit(1);
+ }
+ };
+
+ ASSERT_EXIT(test_logic(), ::testing::ExitedWithCode(0), "Exiting");
+}