This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new c970b3a  [C++] Fix race condition in BlockingQueue (#8765)
c970b3a is described below

commit c970b3ac22aa10399e65858e17655fc895143b35
Author: TT <[email protected]>
AuthorDate: Tue Dec 8 17:42:57 2020 +0800

    [C++] Fix race condition in BlockingQueue (#8765)
    
    ### Motivation
    
    BlockingQueue has race condition that can cause threads waiting forever in 
multithreading environment. ProducerImpl uses BlockingQueue as 
pendingMessagesQueue_ and can be blocked forever at it. This PR fixes race 
condition in BlockingQueue.
    
    #### Race condition details
    
https://github.com/apache/pulsar/blob/91e2f832178d9ffd5d78161145d895910296c2d9/pulsar-client-cpp/lib/BlockingQueue.h#L172-L185
    Use BlockingQueue::Pop as example, its procedure is:
    1. lock
    2. check wasFull and then change queue state
    3. unlock
    4. if wasFull, notify one thread waiting at queueFullCondition
    
    Race condition sequence:
    1. queue is full and there are multiple threads waiting on 
queueFullCondition
    2. thread A call Pop, lock, wasFull is true, unlock -> queue has one free 
space
    3. thread B call Pop, lock, wasFull is false, unlock -> queue has two free 
spaces
    4. thread A notify one thread waiting at queueFullCondition
    5. queue is no loger full again
    6. result: except one thread is notified by A, other threads waiting on 
queueFullCondition are waiting forever
    
    ### Modifications
    
    * Use notify_all instead of notify_one to notify threads waiting on 
condition variables
      Reason: Currently only notify threads when queue is full or empty. After 
unlock, other threads may change queue state, so thread to notify condition can 
not determine how queue state changed and should use notify_all in case of more 
then one  change occured.
    
    ### Verifying this change
    
      - Add a test case BlockingQueueTest.testPushPopRace to test concurrent 
push and pop
    
    (cherry picked from commit 18b38766fbcf3d5944824e828566edca310ee9d8)
---
 pulsar-client-cpp/lib/BlockingQueue.h        | 12 ++++-----
 pulsar-client-cpp/tests/BlockingQueueTest.cc | 39 ++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/BlockingQueue.h 
b/pulsar-client-cpp/lib/BlockingQueue.h
index 5e466bd..2814c4f 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -126,7 +126,7 @@ class BlockingQueue {
         lock.unlock();
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
     }
 
@@ -145,7 +145,7 @@ class BlockingQueue {
 
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
     }
 
@@ -163,7 +163,7 @@ class BlockingQueue {
 
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
 
         return true;
@@ -180,7 +180,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that an element is popped
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
@@ -196,7 +196,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that an element is popped
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
@@ -274,7 +274,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that one spot is now available
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc 
b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 4047e5e..42644e9 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -19,6 +19,8 @@
 #include <gtest/gtest.h>
 #include <lib/BlockingQueue.h>
 
+#include <future>
+#include <iostream>
 #include <thread>
 
 class ProducerWorker {
@@ -215,3 +217,40 @@ TEST(BlockingQueueTest, testReservedSpot) {
         ASSERT_EQ(0, queue.size());
     }
 }
+
+TEST(BlockingQueueTest, testPushPopRace) {
+    auto test_logic = []() {
+        size_t size = 5;
+        BlockingQueue<int> queue(size);
+
+        std::vector<std::unique_ptr<ProducerWorker>> producers;
+        for (int i = 0; i < 5; ++i) {
+            producers.emplace_back(new ProducerWorker{queue});
+            producers.back()->produce(1000);
+        }
+
+        // wait for queue full
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        std::vector<std::unique_ptr<ConsumerWorker>> consumers;
+        for (int i = 0; i < 5; ++i) {
+            consumers.emplace_back(new ConsumerWorker{queue});
+            consumers.back()->consume(1000);
+        }
+
+        auto future = std::async(std::launch::async, [&]() {
+            for (auto& p : producers) p->join();
+            for (auto& c : consumers) c->join();
+        });
+        auto ret = future.wait_for(std::chrono::seconds(5));
+        if (ret == std::future_status::ready) {
+            std::cerr << "Exiting";
+            exit(0);
+        } else {
+            std::cerr << "Threads are not exited in time";
+            exit(1);
+        }
+    };
+
+    ASSERT_EXIT(test_logic(), ::testing::ExitedWithCode(0), "Exiting");
+}

Reply via email to