This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0b8faac5f75 PBTree: Logging Disabled with Ratis and Pre-Allocation
Disabled for devices using template (#11467)
0b8faac5f75 is described below
commit 0b8faac5f75ab8a7e63bde5c6dbdffb3fbc2d49e
Author: ZhaoXin <[email protected]>
AuthorDate: Tue Nov 7 19:21:09 2023 +0800
PBTree: Logging Disabled with Ratis and Pre-Allocation Disabled for devices
using template (#11467)
---
.../mtree/impl/pbtree/schemafile/SchemaFile.java | 35 ++++++++---
.../schemafile/pagemgr/BTreePageManager.java | 6 +-
.../pbtree/schemafile/pagemgr/PageManager.java | 71 +++++++++++++++++-----
.../mtree/schemafile/SchemaFileLogTest.java | 13 ++++
.../schemaRegion/SchemaRegionTemplateTest.java | 42 +------------
5 files changed, 102 insertions(+), 65 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index c4781991eba..c703e641345 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -141,7 +143,11 @@ public class SchemaFile implements ISchemaFile {
return new SchemaFile(
sgName,
schemaRegionId,
- !pmtFile.exists(),
+ !pmtFile.exists()
+ || IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS),
CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
false);
}
@@ -223,11 +229,18 @@ public class SchemaFile implements ISchemaFile {
setNodeAddress(node, lastSGAddr);
} else {
if (curSegAddr < 0L) {
+ if (node.isDevice() && node.getAsDeviceMNode().isUseTemplate()) {
+ throw new MetadataException(
+ String.format(
+ "Adding or updating children of device using template [%s]
is NOT allowed.",
+ node.getFullPath()));
+ }
+
// now only 32 bits page index is allowed
throw new MetadataException(
String.format(
- "Cannot store a node with segment address [%s] except for
StorageGroupNode.",
- curSegAddr));
+ "Cannot flush any node with negative address [%s] except for
DatabaseNode.",
+ node.getFullPath()));
}
}
@@ -470,11 +483,19 @@ public class SchemaFile implements ISchemaFile {
File schemaFile =
SystemFileFactory.INSTANCE.getFile(
getDirPath(sgName, schemaRegionId),
SchemaConstant.PBTREE_FILE_NAME);
- File schemaLogFile =
- SystemFileFactory.INSTANCE.getFile(
- getDirPath(sgName, schemaRegionId),
SchemaConstant.PBTREE_LOG_FILE_NAME);
Files.deleteIfExists(schemaFile.toPath());
- Files.deleteIfExists(schemaLogFile.toPath());
+
+ if (!IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ // schemaFileLog disabled with RATIS consensus
+ File schemaLogFile =
+ SystemFileFactory.INSTANCE.getFile(
+ getDirPath(sgName, schemaRegionId),
SchemaConstant.PBTREE_LOG_FILE_NAME);
+ Files.deleteIfExists(schemaLogFile.toPath());
+ }
+
Files.copy(snapshot.toPath(), schemaFile.toPath());
return new SchemaFile(
sgName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
index 43c60624d4a..e80c4dfb1dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/BTreePageManager.java
@@ -303,7 +303,8 @@ public class BTreePageManager extends PageManager {
tarPage.getAsSegmentedPage().removeRecord(getSegIndex(recSegAddr),
node.getName());
// remove segments belongs to node
- if (!node.isMeasurement()) {
+ if (!node.isMeasurement() && getNodeAddress(node) > 0) {
+ // node with maliciously modified address may result in orphan pages
long delSegAddr = getNodeAddress(node);
tarPage = getPageInstance(getPageIndex(delSegAddr));
@@ -317,6 +318,9 @@ public class BTreePageManager extends PageManager {
}
if (tarPage.getAsInternalPage() != null) {
+ // If the deleted one points to an Internal (root of BTree), there are
two BTrees to handle:
+ // one mapping node names to record buffers, and another mapping
aliases to names. </br>
+ // All of those are turned into SegmentedPage.
Deque<Integer> cascadePages = new
ArrayDeque<>(tarPage.getAsInternalPage().getAllRecords());
cascadePages.add(tarPage.getPageIndex());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 7d58cf6a12d..eb36b6d2be7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -20,6 +20,8 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -89,6 +91,9 @@ public abstract class PageManager implements IPageManager {
private final AtomicInteger logCounter;
private SchemaFileLogWriter logWriter;
+ // flush strategy is dependent on consensus protocol, only check protocol on
init
+ protected FlushPageStrategy flushDirtyPagesStrategy;
+
PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String
logPath)
throws IOException, MetadataException {
this.pageInstCache =
@@ -107,10 +112,21 @@ public abstract class PageManager implements IPageManager
{
this.pmtFile = pmtFile;
this.readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
- // recover if log exists
- int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
- this.logWriter = new SchemaFileLogWriter(logPath);
- logCounter = new AtomicInteger(pageAcc);
+ if (IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ // with RATIS enabled, integrity is guaranteed by consensus protocol
+ logCounter = new AtomicInteger();
+ logWriter = null;
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+ } else {
+ // without RATIS, utilize physical logging for integrity
+ int pageAcc = (int) recoverFromLog(logPath) /
SchemaFileConfig.PAGE_LENGTH;
+ this.logWriter = new SchemaFileLogWriter(logPath);
+ logCounter = new AtomicInteger(pageAcc);
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+ }
// construct first page if file to init
if (lastPageIndex < 0) {
@@ -164,13 +180,20 @@ public abstract class PageManager implements IPageManager
{
child = entry.getValue();
if (!child.isMeasurement()) {
alias = null;
- if (SchemaFile.getNodeAddress(child) < 0) {
+
+ if (SchemaFile.getNodeAddress(child) >= 0) {
+ // new child with a valid segment address, weird
+ throw new MetadataException(
+ String.format(
+ "A child [%s] in newChildBuffer shall not have
segmentAddress.",
+ child.getFullPath()));
+ }
+
+ // pre-allocate except that child is a device node using template
+ if (!(child.isDevice() && child.getAsDeviceMNode().isUseTemplate())) {
short estSegSize = estimateSegmentSize(child);
long glbIndex = preAllocateSegment(estSegSize);
SchemaFile.setNodeAddress(child, glbIndex);
- } else {
- // new child with a valid segment address, weird
- throw new MetadataException("A child in newChildBuffer shall not
have segmentAddress.");
}
} else {
alias =
@@ -405,13 +428,12 @@ public abstract class PageManager implements IPageManager
{
return lastPageIndex.get();
}
- @Override
- public void flushDirtyPages() throws IOException {
- if (dirtyPages.size() == 0) {
- return;
- }
+ @FunctionalInterface
+ interface FlushPageStrategy {
+ void apply() throws IOException;
+ }
- // TODO: better performance expected while ensuring integrity when
exception interrupts
+ private void flushDirtyPagesWithLogging() throws IOException {
if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
logWriter = logWriter.renew();
logCounter.set(0);
@@ -428,6 +450,21 @@ public abstract class PageManager implements IPageManager {
page.flushPageToChannel(channel);
}
logWriter.commit();
+ }
+
+ private void flushDirtyPagesWithoutLogging() throws IOException {
+ for (ISchemaPage page : dirtyPages.values()) {
+ page.syncPageBuffer();
+ page.flushPageToChannel(channel);
+ }
+ }
+
+ @Override
+ public void flushDirtyPages() throws IOException {
+ if (dirtyPages.size() == 0) {
+ return;
+ }
+ flushDirtyPagesStrategy.apply();
dirtyPages.clear();
Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
}
@@ -438,7 +475,7 @@ public abstract class PageManager implements IPageManager {
Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
pageInstCache.clear();
lastPageIndex.set(0);
- logWriter = logWriter.renew();
+ logWriter = logWriter == null ? null : logWriter.renew();
}
@Override
@@ -454,7 +491,9 @@ public abstract class PageManager implements IPageManager {
@Override
public void close() throws IOException {
- logWriter.close();
+ if (logWriter != null) {
+ logWriter.close();
+ }
}
// endregion
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index b82cc357fea..71baa0cdc47 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
@@ -73,6 +75,13 @@ public class SchemaFileLogTest {
@Test
public void essentialLogTest() throws IOException, MetadataException {
+ // select SIMPLE consensus to trigger logging
+ String previousConsensus =
+
IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS);
+
SchemaFile sf =
(SchemaFile) SchemaFile.initSchemaFile("root.test.vRoot1",
TEST_SCHEMA_REGION_ID);
IDatabaseMNode<ICachedMNode> newSGNode =
@@ -163,5 +172,9 @@ public class SchemaFileLogTest {
}
Assert.assertEquals(cnt, cnt2);
sf.close();
+
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setSchemaRegionConsensusProtocolClass(previousConsensus);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
index 59f73a6637f..2cba2fdf6d4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTemplateTest.java
@@ -55,17 +55,6 @@ public class SchemaRegionTemplateTest extends
AbstractSchemaRegionTest {
@Test
public void testActivateSchemaTemplate() throws Exception {
ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- schemaRegion.createTimeseries(
- SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
- new PartialPath("root.sg.wf01.wt01.status"),
- TSDataType.BOOLEAN,
- TSEncoding.PLAIN,
- CompressionType.SNAPPY,
- null,
- null,
- null,
- null),
- -1);
int templateId = 1;
Template template =
new Template(
@@ -115,17 +104,6 @@ public class SchemaRegionTemplateTest extends
AbstractSchemaRegionTest {
@Test
public void testDeactivateTemplate() throws Exception {
ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
- schemaRegion.createTimeseries(
- SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
- new PartialPath("root.sg.wf01.wt01.status"),
- TSDataType.BOOLEAN,
- TSEncoding.PLAIN,
- CompressionType.SNAPPY,
- null,
- null,
- null,
- null),
- -1);
int templateId = 1;
Template template =
new Template(
@@ -186,10 +164,6 @@ public class SchemaRegionTemplateTest extends
AbstractSchemaRegionTest {
Arrays.asList(TSEncoding.RLE, TSEncoding.RLE),
Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY));
template.setId(templateId);
- schemaRegion.activateSchemaTemplate(
- SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
- new PartialPath("root.sg.wf01.wt01"), 3, templateId),
- template);
schemaRegion.activateSchemaTemplate(
SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
new PartialPath("root.sg.wf02"), 2, templateId),
@@ -197,8 +171,6 @@ public class SchemaRegionTemplateTest extends
AbstractSchemaRegionTest {
Map<Integer, Template> templateMap = Collections.singletonMap(templateId,
template);
List<String> expectedTimeseries =
Arrays.asList(
- "root.sg.wf01.wt01.s1",
- "root.sg.wf01.wt01.s2",
"root.sg.wf01.wt01.status",
"root.sg.wf01.wt01.temperature",
"root.sg.wf02.s1",
@@ -239,22 +211,10 @@ public class SchemaRegionTemplateTest extends
AbstractSchemaRegionTest {
new PartialPath("root.db.d1"), 3, templateId),
template);
- schemaRegion.createTimeseries(
- SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
- new PartialPath("root.db.d1.s3"),
- TSDataType.BOOLEAN,
- TSEncoding.PLAIN,
- CompressionType.SNAPPY,
- null,
- null,
- null,
- null),
- -1);
-
Assert.assertEquals(
0, SchemaRegionTestUtil.deleteTimeSeries(schemaRegion, new
PartialPath("root.db.d1.s1")));
Assert.assertEquals(
- 1, SchemaRegionTestUtil.deleteTimeSeries(schemaRegion, new
PartialPath("root.db.d1.s3")));
+ 0, SchemaRegionTestUtil.deleteTimeSeries(schemaRegion, new
PartialPath("root.db.d1.s3")));
Assert.assertEquals(
1,