This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c64ea41227 Fix concurrent read lock bug in StampedWriterPreferredLock 
(#11453)
2c64ea41227 is described below

commit 2c64ea41227c798cbd45531eeea5d4cc0cb4b0aa
Author: Chen YZ <[email protected]>
AuthorDate: Thu Nov 2 13:52:05 2023 +0800

    Fix concurrent read lock bug in StampedWriterPreferredLock (#11453)
---
 .../impl/pbtree/StampedWriterPreferredLock.java    | 10 ++---
 .../mtree/lock/StampedWriterPreferredLockTest.java | 43 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
index 66ef3ab2adb..c48624c6a3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
@@ -50,12 +50,12 @@ public class StampedWriterPreferredLock {
   private final Lock lock = new ReentrantLock();
   private final Condition okToRead = lock.newCondition();
   private final Condition okToWrite = lock.newCondition();
-  private long stampAllocator = 0;
+  private volatile long stampAllocator = 0;
 
   private final Map<Long, Integer> readCnt = new HashMap<>();
-  private int readWait = 0;
-  private int writeCnt = 0;
-  private int writeWait = 0;
+  private volatile int readWait = 0;
+  private volatile int writeCnt = 0;
+  private volatile int writeWait = 0;
 
   private final ThreadLocal<Long> sharedOwnerStamp = new ThreadLocal<>();
   /**
@@ -115,7 +115,7 @@ public class StampedWriterPreferredLock {
    * @return read lock stamp
    */
   private long acquireReadLockStamp(boolean prior) {
-    if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
+    while ((prior ? writeCnt : writeCnt + writeWait) > 0) {
       readWait++;
       try {
         okToRead.await();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 2eb8c387896..b005c62605c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -217,4 +217,47 @@ public class StampedWriterPreferredLockTest {
     Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() 
== 4);
     Assert.assertEquals(4, counter.get());
   }
+
+  @Test
+  public void testConcurrent() throws InterruptedException {
+    StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
+    Semaphore semaphore = new Semaphore(0);
+    AtomicInteger counter1 = new AtomicInteger();
+    AtomicInteger counter2 = new AtomicInteger();
+    // main thread get read lock by stamp
+    new Thread(
+            () -> {
+              // writer thread will be blocked util main thread release read 
lock.
+              lock.writeLock();
+              try {
+                semaphore.acquire();
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              lock.unlockWrite();
+              lock.writeLock();
+              counter1.incrementAndGet();
+              try {
+                Thread.sleep(500);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              counter2.incrementAndGet();
+              lock.unlockWrite();
+            })
+        .start();
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(500);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              semaphore.release();
+            })
+        .start();
+    lock.threadReadLock();
+    Assert.assertEquals(counter2.get(), counter1.get());
+    lock.threadReadUnlock();
+  }
 }

Reply via email to