This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b8faac5f75 PBTree: Logging Disabled with Ratis and Pre-Allocation 
Disabled for devices using template (#11467)
0b8faac5f75 is described below

commit 0b8faac5f75ab8a7e63bde5c6dbdffb3fbc2d49e
Author: ZhaoXin <[email protected]>
AuthorDate: Tue Nov 7 19:21:09 2023 +0800

    PBTree: Logging Disabled with Ratis and Pre-Allocation Disabled for devices 
using template (#11467)
---
 .../mtree/impl/pbtree/schemafile/SchemaFile.java   | 35 ++++++++---
 .../schemafile/pagemgr/BTreePageManager.java       |  6 +-
 .../pbtree/schemafile/pagemgr/PageManager.java     | 71 +++++++++++++++++-----
 .../mtree/schemafile/SchemaFileLogTest.java        | 13 ++++
 .../schemaRegion/SchemaRegionTemplateTest.java     | 42 +------------
 5 files changed, 102 insertions(+), 65 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index c4781991eba..c703e641345 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -141,7 +143,11 @@ public class SchemaFile implements ISchemaFile {
     return new SchemaFile(
         sgName,
         schemaRegionId,
-        !pmtFile.exists(),
+        !pmtFile.exists()
+            || IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getSchemaRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.RATIS_CONSENSUS),
         CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
         false);
   }
@@ -223,11 +229,18 @@ public class SchemaFile implements ISchemaFile {
       setNodeAddress(node, lastSGAddr);
     } else {
       if (curSegAddr < 0L) {
+        if (node.isDevice() && node.getAsDeviceMNode().isUseTemplate()) {
+          throw new MetadataException(
+              String.format(
+                  "Adding or updating children of device using template [%s] 
is NOT allowed.",
+                  node.getFullPath()));
+        }
+
         // now only 32 bits page index is allowed
         throw new MetadataException(
             String.format(
-                "Cannot store a node with segment address [%s] except for 
StorageGroupNode.",
-                curSegAddr));
+                "Cannot flush any node with negative address [%s] except for 
DatabaseNode.",
+                node.getFullPath()));
       }
     }
 
@@ -470,11 +483,19 @@ public class SchemaFile implements ISchemaFile {
     File schemaFile =
         SystemFileFactory.INSTANCE.getFile(
             getDirPath(sgName, schemaRegionId), 
SchemaConstant.PBTREE_FILE_NAME);
-    File schemaLogFile =
-        SystemFileFactory.INSTANCE.getFile(
-            getDirPath(sgName, schemaRegionId), 
SchemaConstant.PBTREE_LOG_FILE_NAME);
     Files.deleteIfExists(schemaFile.toPath());
-    Files.deleteIfExists(schemaLogFile.toPath());
+
+    if (!IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getSchemaRegionConsensusProtocolClass()
+        .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      // schemaFileLog disabled with RATIS consensus
+      File schemaLogFile =
+          SystemFileFactory.INSTANCE.getFile(
+              getDirPath(sgName, schemaRegionId), 
SchemaConstant.PBTREE_LOG_FILE_NAME);
+      Files.deleteIfExists(schemaLogFile.toPath());
+    }
+
     Files.copy(snapshot.toPath(), schemaFile.toPath());
     return new SchemaFile(
         sgName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
index 43c60624d4a..e80c4dfb1dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
@@ -303,7 +303,8 @@ public class BTreePageManager extends PageManager {
     tarPage.getAsSegmentedPage().removeRecord(getSegIndex(recSegAddr), 
node.getName());
 
     // remove segments belongs to node
-    if (!node.isMeasurement()) {
+    if (!node.isMeasurement() && getNodeAddress(node) > 0) {
+      // node with maliciously modified address may result in orphan pages
       long delSegAddr = getNodeAddress(node);
       tarPage = getPageInstance(getPageIndex(delSegAddr));
 
@@ -317,6 +318,9 @@ public class BTreePageManager extends PageManager {
       }
 
       if (tarPage.getAsInternalPage() != null) {
+        // If the deleted one points to an Internal (root of BTree), there are 
two BTrees to handle:
+        //  one mapping node names to record buffers, and another mapping 
aliases to names. </br>
+        // All of those are turned into SegmentedPage.
         Deque<Integer> cascadePages = new 
ArrayDeque<>(tarPage.getAsInternalPage().getAllRecords());
         cascadePages.add(tarPage.getPageIndex());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 7d58cf6a12d..eb36b6d2be7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -20,6 +20,8 @@ package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -89,6 +91,9 @@ public abstract class PageManager implements IPageManager {
   private final AtomicInteger logCounter;
   private SchemaFileLogWriter logWriter;
 
+  // flush strategy is dependent on consensus protocol, only check protocol on 
init
+  protected FlushPageStrategy flushDirtyPagesStrategy;
+
   PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String 
logPath)
       throws IOException, MetadataException {
     this.pageInstCache =
@@ -107,10 +112,21 @@ public abstract class PageManager implements IPageManager 
{
     this.pmtFile = pmtFile;
     this.readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
 
-    // recover if log exists
-    int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
-    this.logWriter = new SchemaFileLogWriter(logPath);
-    logCounter = new AtomicInteger(pageAcc);
+    if (IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getSchemaRegionConsensusProtocolClass()
+        .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      // with RATIS enabled, integrity is guaranteed by consensus protocol
+      logCounter = new AtomicInteger();
+      logWriter = null;
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+    } else {
+      // without RATIS, utilize physical logging for integrity
+      int pageAcc = (int) recoverFromLog(logPath) / 
SchemaFileConfig.PAGE_LENGTH;
+      this.logWriter = new SchemaFileLogWriter(logPath);
+      logCounter = new AtomicInteger(pageAcc);
+      flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+    }
 
     // construct first page if file to init
     if (lastPageIndex < 0) {
@@ -164,13 +180,20 @@ public abstract class PageManager implements IPageManager 
{
       child = entry.getValue();
       if (!child.isMeasurement()) {
         alias = null;
-        if (SchemaFile.getNodeAddress(child) < 0) {
+
+        if (SchemaFile.getNodeAddress(child) >= 0) {
+          // new child with a valid segment address, weird
+          throw new MetadataException(
+              String.format(
+                  "A child [%s] in newChildBuffer shall not have 
segmentAddress.",
+                  child.getFullPath()));
+        }
+
+        // pre-allocate except that child is a device node using template
+        if (!(child.isDevice() && child.getAsDeviceMNode().isUseTemplate())) {
           short estSegSize = estimateSegmentSize(child);
           long glbIndex = preAllocateSegment(estSegSize);
           SchemaFile.setNodeAddress(child, glbIndex);
-        } else {
-          // new child with a valid segment address, weird
-          throw new MetadataException("A child in newChildBuffer shall not 
have segmentAddress.");
         }
       } else {
         alias =
@@ -405,13 +428,12 @@ public abstract class PageManager implements IPageManager 
{
     return lastPageIndex.get();
   }
 
-  @Override
-  public void flushDirtyPages() throws IOException {
-    if (dirtyPages.size() == 0) {
-      return;
-    }
+  @FunctionalInterface
+  interface FlushPageStrategy {
+    void apply() throws IOException;
+  }
 
-    // TODO: better performance expected while ensuring integrity when 
exception interrupts
+  private void flushDirtyPagesWithLogging() throws IOException {
     if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
       logWriter = logWriter.renew();
       logCounter.set(0);
@@ -428,6 +450,21 @@ public abstract class PageManager implements IPageManager {
       page.flushPageToChannel(channel);
     }
     logWriter.commit();
+  }
+
+  private void flushDirtyPagesWithoutLogging() throws IOException {
+    for (ISchemaPage page : dirtyPages.values()) {
+      page.syncPageBuffer();
+      page.flushPageToChannel(channel);
+    }
+  }
+
+  @Override
+  public void flushDirtyPages() throws IOException {
+    if (dirtyPages.size() == 0) {
+      return;
+    }
+    flushDirtyPagesStrategy.apply();
     dirtyPages.clear();
     Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
   }
@@ -438,7 +475,7 @@ public abstract class PageManager implements IPageManager {
     Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
     pageInstCache.clear();
     lastPageIndex.set(0);
-    logWriter = logWriter.renew();
+    logWriter = logWriter == null ? null : logWriter.renew();
   }
 
   @Override
@@ -454,7 +491,9 @@ public abstract class PageManager implements IPageManager {
 
   @Override
   public void close() throws IOException {
-    logWriter.close();
+    if (logWriter != null) {
+      logWriter.close();
+    }
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index b82cc357fea..71baa0cdc47 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
@@ -73,6 +75,13 @@ public class SchemaFileLogTest {
 
   @Test
   public void essentialLogTest() throws IOException, MetadataException {
+    // select SIMPLE consensus to trigger logging
+    String previousConsensus =
+        
IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS);
+
     SchemaFile sf =
         (SchemaFile) SchemaFile.initSchemaFile("root.test.vRoot1", 
TEST_SCHEMA_REGION_ID);
     IDatabaseMNode<ICachedMNode> newSGNode =
@@ -163,5 +172,9 @@ public class SchemaFileLogTest {
     }
     Assert.assertEquals(cnt, cnt2);
     sf.close();
+
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setSchemaRegionConsensusProtocolClass(previousConsensus);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
index 59f73a6637f..2cba2fdf6d4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
@@ -55,17 +55,6 @@ public class SchemaRegionTemplateTest extends 
AbstractSchemaRegionTest {
   @Test
   public void testActivateSchemaTemplate() throws Exception {
     ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
-    schemaRegion.createTimeseries(
-        SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
-            new PartialPath("root.sg.wf01.wt01.status"),
-            TSDataType.BOOLEAN,
-            TSEncoding.PLAIN,
-            CompressionType.SNAPPY,
-            null,
-            null,
-            null,
-            null),
-        -1);
     int templateId = 1;
     Template template =
         new Template(
@@ -115,17 +104,6 @@ public class SchemaRegionTemplateTest extends 
AbstractSchemaRegionTest {
   @Test
   public void testDeactivateTemplate() throws Exception {
     ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
-    schemaRegion.createTimeseries(
-        SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
-            new PartialPath("root.sg.wf01.wt01.status"),
-            TSDataType.BOOLEAN,
-            TSEncoding.PLAIN,
-            CompressionType.SNAPPY,
-            null,
-            null,
-            null,
-            null),
-        -1);
     int templateId = 1;
     Template template =
         new Template(
@@ -186,10 +164,6 @@ public class SchemaRegionTemplateTest extends 
AbstractSchemaRegionTest {
             Arrays.asList(TSEncoding.RLE, TSEncoding.RLE),
             Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY));
     template.setId(templateId);
-    schemaRegion.activateSchemaTemplate(
-        SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
-            new PartialPath("root.sg.wf01.wt01"), 3, templateId),
-        template);
     schemaRegion.activateSchemaTemplate(
         SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
             new PartialPath("root.sg.wf02"), 2, templateId),
@@ -197,8 +171,6 @@ public class SchemaRegionTemplateTest extends 
AbstractSchemaRegionTest {
     Map<Integer, Template> templateMap = Collections.singletonMap(templateId, 
template);
     List<String> expectedTimeseries =
         Arrays.asList(
-            "root.sg.wf01.wt01.s1",
-            "root.sg.wf01.wt01.s2",
             "root.sg.wf01.wt01.status",
             "root.sg.wf01.wt01.temperature",
             "root.sg.wf02.s1",
@@ -239,22 +211,10 @@ public class SchemaRegionTemplateTest extends 
AbstractSchemaRegionTest {
             new PartialPath("root.db.d1"), 3, templateId),
         template);
 
-    schemaRegion.createTimeseries(
-        SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
-            new PartialPath("root.db.d1.s3"),
-            TSDataType.BOOLEAN,
-            TSEncoding.PLAIN,
-            CompressionType.SNAPPY,
-            null,
-            null,
-            null,
-            null),
-        -1);
-
     Assert.assertEquals(
         0, SchemaRegionTestUtil.deleteTimeSeries(schemaRegion, new 
PartialPath("root.db.d1.s1")));
     Assert.assertEquals(
-        1, SchemaRegionTestUtil.deleteTimeSeries(schemaRegion, new 
PartialPath("root.db.d1.s3")));
+        0, SchemaRegionTestUtil.deleteTimeSeries(schemaRegion, new 
PartialPath("root.db.d1.s3")));
 
     Assert.assertEquals(
         1,

Reply via email to