This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 4192bad3af7 [To rel/1.2] Fix lock bug and improve schema file write 
policy in PBTree (#11477)
4192bad3af7 is described below

commit 4192bad3af76eb4a1e303bb040b3b5ebf46d144d
Author: Marcos_Zyk <[email protected]>
AuthorDate: Mon Nov 6 16:49:41 2023 +0800

    [To rel/1.2] Fix lock bug and improve schema file write policy in PBTree 
(#11477)
---
 .../schema/source/TimeSeriesSchemaSource.java      |  6 ++-
 .../mtree/impl/pbtree/CachedMTreeStore.java        |  6 +++
 .../impl/pbtree/StampedWriterPreferredLock.java    | 10 ++--
 .../mtree/impl/pbtree/schemafile/SchemaFile.java   |  8 ++-
 .../pbtree/schemafile/pagemgr/PageManager.java     | 58 ++++++++++++++++------
 .../mtree/lock/StampedWriterPreferredLockTest.java | 43 ++++++++++++++++
 .../mtree/schemafile/SchemaFileLogTest.java        | 13 +++++
 7 files changed, 123 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 3da6a14e4f6..38cbaa83476 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static 
org.apache.iotdb.db.schemaengine.SchemaConstant.ALL_MATCH_PATTERN;
 
 public class TimeSeriesSchemaSource implements 
ISchemaSource<ITimeSeriesSchemaInfo> {
@@ -117,7 +118,10 @@ public class TimeSeriesSchemaSource implements 
ISchemaSource<ITimeSeriesSchemaIn
 
   @Override
   public boolean hasSchemaStatistic(ISchemaRegion schemaRegion) {
-    return pathPattern.equals(ALL_MATCH_PATTERN) && (schemaFilter == null);
+    return (pathPattern.equals(ALL_MATCH_PATTERN)
+            || (pathPattern.getMeasurement().equals(MULTI_LEVEL_PATH_WILDCARD)
+                && 
schemaRegion.getDatabaseFullPath().startsWith(pathPattern.getDevice())))
+        && (schemaFilter == null);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 2bf9b3a698f..1103793f4ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -556,6 +556,8 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
           return;
         }
       }
+
+      long startTime = System.currentTimeMillis();
       List<ICachedMNode> nodesToPersist = cacheManager.collectVolatileMNodes();
       for (ICachedMNode volatileNode : nodesToPersist) {
         try {
@@ -569,6 +571,10 @@ public class CachedMTreeStore implements 
IMTreeStore<ICachedMNode> {
         }
         cacheManager.updateCacheStatusAfterPersist(volatileNode);
       }
+      logger.info(
+          "It takes {}ms to flush MTree in SchemaRegion {}",
+          (System.currentTimeMillis() - startTime),
+          regionStatistics.getSchemaRegionId());
       if (updatedStorageGroupMNode != null || !nodesToPersist.isEmpty()) {
         flushCallback.run();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
index 66ef3ab2adb..c48624c6a3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
@@ -50,12 +50,12 @@ public class StampedWriterPreferredLock {
   private final Lock lock = new ReentrantLock();
   private final Condition okToRead = lock.newCondition();
   private final Condition okToWrite = lock.newCondition();
-  private long stampAllocator = 0;
+  private volatile long stampAllocator = 0;
 
   private final Map<Long, Integer> readCnt = new HashMap<>();
-  private int readWait = 0;
-  private int writeCnt = 0;
-  private int writeWait = 0;
+  private volatile int readWait = 0;
+  private volatile int writeCnt = 0;
+  private volatile int writeWait = 0;
 
   private final ThreadLocal<Long> sharedOwnerStamp = new ThreadLocal<>();
   /**
@@ -115,7 +115,7 @@ public class StampedWriterPreferredLock {
    * @return read lock stamp
    */
   private long acquireReadLockStamp(boolean prior) {
-    if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
+    while ((prior ? writeCnt : writeCnt + writeWait) > 0) {
       readWait++;
       try {
         okToRead.await();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 989720f664b..d47520538b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
 import org.apache.iotdb.db.schemaengine.SchemaConstant;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -141,7 +143,11 @@ public class SchemaFile implements ISchemaFile {
     return new SchemaFile(
         sgName,
         schemaRegionId,
-        !pmtFile.exists(),
+        !pmtFile.exists()
+            || IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getSchemaRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.RATIS_CONSENSUS),
         CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
         false);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 7d58cf6a12d..d4065b94203 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -20,6 +20,8 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -89,6 +91,9 @@ public abstract class PageManager implements IPageManager {
   private final AtomicInteger logCounter;
   private SchemaFileLogWriter logWriter;
 
+  // flush strategy is dependent on consensus protocol, only check protocol on 
init
+  protected FlushPageStrategy flushDirtyPagesStrategy;
+
   PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String 
logPath)
       throws IOException, MetadataException {
     this.pageInstCache =
@@ -107,10 +112,21 @@ public abstract class PageManager implements IPageManager 
{
     this.pmtFile = pmtFile;
     this.readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
 
-    // recover if log exists
-    int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
-    this.logWriter = new SchemaFileLogWriter(logPath);
-    logCounter = new AtomicInteger(pageAcc);
+    if (IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getSchemaRegionConsensusProtocolClass()
+        .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      // with RATIS enabled, integrity is guaranteed by consensus protocol
+      logCounter = new AtomicInteger();
+      logWriter = null;
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+    } else {
+      // without RATIS, utilize physical logging for integrity
+      int pageAcc = (int) recoverFromLog(logPath) / 
SchemaFileConfig.PAGE_LENGTH;
+      this.logWriter = new SchemaFileLogWriter(logPath);
+      logCounter = new AtomicInteger(pageAcc);
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+    }
 
     // construct first page if file to init
     if (lastPageIndex < 0) {
@@ -405,29 +421,41 @@ public abstract class PageManager implements IPageManager 
{
     return lastPageIndex.get();
   }
 
-  @Override
-  public void flushDirtyPages() throws IOException {
-    if (dirtyPages.size() == 0) {
-      return;
-    }
+  @FunctionalInterface
+  interface FlushPageStrategy {
+    void apply() throws IOException;
+  }
 
-    // TODO: better performance expected while ensuring integrity when 
exception interrupts
+  private void flushDirtyPagesWithLogging() throws IOException {
     if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
       logWriter = logWriter.renew();
       logCounter.set(0);
     }
-
     logCounter.addAndGet(dirtyPages.size());
     for (ISchemaPage page : dirtyPages.values()) {
       page.syncPageBuffer();
       logWriter.write(page);
     }
     logWriter.prepare();
-
     for (ISchemaPage page : dirtyPages.values()) {
       page.flushPageToChannel(channel);
     }
     logWriter.commit();
+  }
+
+  private void flushDirtyPagesWithoutLogging() throws IOException {
+    for (ISchemaPage page : dirtyPages.values()) {
+      page.syncPageBuffer();
+      page.flushPageToChannel(channel);
+    }
+  }
+
+  @Override
+  public void flushDirtyPages() throws IOException {
+    if (dirtyPages.size() == 0) {
+      return;
+    }
+    flushDirtyPagesStrategy.apply();
     dirtyPages.clear();
     Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
   }
@@ -438,7 +466,7 @@ public abstract class PageManager implements IPageManager {
     Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
     pageInstCache.clear();
     lastPageIndex.set(0);
-    logWriter = logWriter.renew();
+    logWriter = logWriter == null ? null : logWriter.renew();
   }
 
   @Override
@@ -454,7 +482,9 @@ public abstract class PageManager implements IPageManager {
 
   @Override
   public void close() throws IOException {
-    logWriter.close();
+    if (logWriter != null) {
+      logWriter.close();
+    }
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 2eb8c387896..b005c62605c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -217,4 +217,47 @@ public class StampedWriterPreferredLockTest {
     Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() 
== 4);
     Assert.assertEquals(4, counter.get());
   }
+
+  @Test
+  public void testConcurrent() throws InterruptedException {
+    StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
+    Semaphore semaphore = new Semaphore(0);
+    AtomicInteger counter1 = new AtomicInteger();
+    AtomicInteger counter2 = new AtomicInteger();
+    // main thread get read lock by stamp
+    new Thread(
+            () -> {
+              // writer thread will be blocked util main thread release read 
lock.
+              lock.writeLock();
+              try {
+                semaphore.acquire();
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              lock.unlockWrite();
+              lock.writeLock();
+              counter1.incrementAndGet();
+              try {
+                Thread.sleep(500);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              counter2.incrementAndGet();
+              lock.unlockWrite();
+            })
+        .start();
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(500);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              semaphore.release();
+            })
+        .start();
+    lock.threadReadLock();
+    Assert.assertEquals(counter2.get(), counter1.get());
+    lock.threadReadUnlock();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index c994471d17b..8b8bb8416b1 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.schemaengine.SchemaConstant;
 import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -73,6 +75,13 @@ public class SchemaFileLogTest {
 
   @Test
   public void essentialLogTest() throws IOException, MetadataException {
+    // select SIMPLE consensus to trigger logging
+    String previousConsensus =
+        
IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS);
+
     SchemaFile sf =
         (SchemaFile) SchemaFile.initSchemaFile("root.test.vRoot1", 
TEST_SCHEMA_REGION_ID);
     IDatabaseMNode<ICachedMNode> newSGNode =
@@ -163,5 +172,9 @@ public class SchemaFileLogTest {
     }
     Assert.assertEquals(cnt, cnt2);
     sf.close();
+
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setSchemaRegionConsensusProtocolClass(previousConsensus);
   }
 }

Reply via email to