This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 4192bad3af7 [To rel/1.2] Fix lock bug and improve schema file write
policy in PBTree (#11477)
4192bad3af7 is described below
commit 4192bad3af76eb4a1e303bb040b3b5ebf46d144d
Author: Marcos_Zyk <[email protected]>
AuthorDate: Mon Nov 6 16:49:41 2023 +0800
[To rel/1.2] Fix lock bug and improve schema file write policy in PBTree
(#11477)
---
.../schema/source/TimeSeriesSchemaSource.java | 6 ++-
.../mtree/impl/pbtree/CachedMTreeStore.java | 6 +++
.../impl/pbtree/StampedWriterPreferredLock.java | 10 ++--
.../mtree/impl/pbtree/schemafile/SchemaFile.java | 8 ++-
.../pbtree/schemafile/pagemgr/PageManager.java | 58 ++++++++++++++++------
.../mtree/lock/StampedWriterPreferredLockTest.java | 43 ++++++++++++++++
.../mtree/schemafile/SchemaFileLogTest.java | 13 +++++
7 files changed, 123 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 3da6a14e4f6..38cbaa83476 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static
org.apache.iotdb.db.schemaengine.SchemaConstant.ALL_MATCH_PATTERN;
public class TimeSeriesSchemaSource implements
ISchemaSource<ITimeSeriesSchemaInfo> {
@@ -117,7 +118,10 @@ public class TimeSeriesSchemaSource implements
ISchemaSource<ITimeSeriesSchemaIn
@Override
public boolean hasSchemaStatistic(ISchemaRegion schemaRegion) {
- return pathPattern.equals(ALL_MATCH_PATTERN) && (schemaFilter == null);
+ return (pathPattern.equals(ALL_MATCH_PATTERN)
+ || (pathPattern.getMeasurement().equals(MULTI_LEVEL_PATH_WILDCARD)
+ &&
schemaRegion.getDatabaseFullPath().startsWith(pathPattern.getDevice())))
+ && (schemaFilter == null);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
index 2bf9b3a698f..1103793f4ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/CachedMTreeStore.java
@@ -556,6 +556,8 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
return;
}
}
+
+ long startTime = System.currentTimeMillis();
List<ICachedMNode> nodesToPersist = cacheManager.collectVolatileMNodes();
for (ICachedMNode volatileNode : nodesToPersist) {
try {
@@ -569,6 +571,10 @@ public class CachedMTreeStore implements
IMTreeStore<ICachedMNode> {
}
cacheManager.updateCacheStatusAfterPersist(volatileNode);
}
+ logger.info(
+ "It takes {}ms to flush MTree in SchemaRegion {}",
+ (System.currentTimeMillis() - startTime),
+ regionStatistics.getSchemaRegionId());
if (updatedStorageGroupMNode != null || !nodesToPersist.isEmpty()) {
flushCallback.run();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
index 66ef3ab2adb..c48624c6a3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/StampedWriterPreferredLock.java
@@ -50,12 +50,12 @@ public class StampedWriterPreferredLock {
private final Lock lock = new ReentrantLock();
private final Condition okToRead = lock.newCondition();
private final Condition okToWrite = lock.newCondition();
- private long stampAllocator = 0;
+ private volatile long stampAllocator = 0;
private final Map<Long, Integer> readCnt = new HashMap<>();
- private int readWait = 0;
- private int writeCnt = 0;
- private int writeWait = 0;
+ private volatile int readWait = 0;
+ private volatile int writeCnt = 0;
+ private volatile int writeWait = 0;
private final ThreadLocal<Long> sharedOwnerStamp = new ThreadLocal<>();
/**
@@ -115,7 +115,7 @@ public class StampedWriterPreferredLock {
* @return read lock stamp
*/
private long acquireReadLockStamp(boolean prior) {
- if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
+ while ((prior ? writeCnt : writeCnt + writeWait) > 0) {
readWait++;
try {
okToRead.await();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 989720f664b..d47520538b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
import org.apache.iotdb.db.schemaengine.SchemaConstant;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -141,7 +143,11 @@ public class SchemaFile implements ISchemaFile {
return new SchemaFile(
sgName,
schemaRegionId,
- !pmtFile.exists(),
+ !pmtFile.exists()
+ || IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS),
CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
false);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 7d58cf6a12d..d4065b94203 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -20,6 +20,8 @@ package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafi
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
@@ -89,6 +91,9 @@ public abstract class PageManager implements IPageManager {
private final AtomicInteger logCounter;
private SchemaFileLogWriter logWriter;
+ // flush strategy is dependent on consensus protocol, only check protocol on
init
+ protected FlushPageStrategy flushDirtyPagesStrategy;
+
PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String
logPath)
throws IOException, MetadataException {
this.pageInstCache =
@@ -107,10 +112,21 @@ public abstract class PageManager implements IPageManager
{
this.pmtFile = pmtFile;
this.readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
- // recover if log exists
- int pageAcc = (int) recoverFromLog(logPath) / SchemaFileConfig.PAGE_LENGTH;
- this.logWriter = new SchemaFileLogWriter(logPath);
- logCounter = new AtomicInteger(pageAcc);
+ if (IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ // with RATIS enabled, integrity is guaranteed by consensus protocol
+ logCounter = new AtomicInteger();
+ logWriter = null;
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithoutLogging;
+ } else {
+ // without RATIS, utilize physical logging for integrity
+ int pageAcc = (int) recoverFromLog(logPath) /
SchemaFileConfig.PAGE_LENGTH;
+ this.logWriter = new SchemaFileLogWriter(logPath);
+ logCounter = new AtomicInteger(pageAcc);
+ flushDirtyPagesStrategy = this::flushDirtyPagesWithLogging;
+ }
// construct first page if file to init
if (lastPageIndex < 0) {
@@ -405,29 +421,41 @@ public abstract class PageManager implements IPageManager
{
return lastPageIndex.get();
}
- @Override
- public void flushDirtyPages() throws IOException {
- if (dirtyPages.size() == 0) {
- return;
- }
+ @FunctionalInterface
+ interface FlushPageStrategy {
+ void apply() throws IOException;
+ }
- // TODO: better performance expected while ensuring integrity when
exception interrupts
+ private void flushDirtyPagesWithLogging() throws IOException {
if (logCounter.get() > SchemaFileConfig.SCHEMA_FILE_LOG_SIZE) {
logWriter = logWriter.renew();
logCounter.set(0);
}
-
logCounter.addAndGet(dirtyPages.size());
for (ISchemaPage page : dirtyPages.values()) {
page.syncPageBuffer();
logWriter.write(page);
}
logWriter.prepare();
-
for (ISchemaPage page : dirtyPages.values()) {
page.flushPageToChannel(channel);
}
logWriter.commit();
+ }
+
+ private void flushDirtyPagesWithoutLogging() throws IOException {
+ for (ISchemaPage page : dirtyPages.values()) {
+ page.syncPageBuffer();
+ page.flushPageToChannel(channel);
+ }
+ }
+
+ @Override
+ public void flushDirtyPages() throws IOException {
+ if (dirtyPages.size() == 0) {
+ return;
+ }
+ flushDirtyPagesStrategy.apply();
dirtyPages.clear();
Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
}
@@ -438,7 +466,7 @@ public abstract class PageManager implements IPageManager {
Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
pageInstCache.clear();
lastPageIndex.set(0);
- logWriter = logWriter.renew();
+ logWriter = logWriter == null ? null : logWriter.renew();
}
@Override
@@ -454,7 +482,9 @@ public abstract class PageManager implements IPageManager {
@Override
public void close() throws IOException {
- logWriter.close();
+ if (logWriter != null) {
+ logWriter.close();
+ }
}
// endregion
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 2eb8c387896..b005c62605c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -217,4 +217,47 @@ public class StampedWriterPreferredLockTest {
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get()
== 4);
Assert.assertEquals(4, counter.get());
}
+
+ @Test
+ public void testConcurrent() throws InterruptedException {
+ StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
+ Semaphore semaphore = new Semaphore(0);
+ AtomicInteger counter1 = new AtomicInteger();
+ AtomicInteger counter2 = new AtomicInteger();
+ // main thread get read lock by stamp
+ new Thread(
+ () -> {
+ // writer thread will be blocked util main thread release read
lock.
+ lock.writeLock();
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ lock.unlockWrite();
+ lock.writeLock();
+ counter1.incrementAndGet();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ counter2.incrementAndGet();
+ lock.unlockWrite();
+ })
+ .start();
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ semaphore.release();
+ })
+ .start();
+ lock.threadReadLock();
+ Assert.assertEquals(counter2.get(), counter1.get());
+ lock.threadReadUnlock();
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
index c994471d17b..8b8bb8416b1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileLogTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.SchemaConstant;
import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
@@ -73,6 +75,13 @@ public class SchemaFileLogTest {
@Test
public void essentialLogTest() throws IOException, MetadataException {
+ // select SIMPLE consensus to trigger logging
+ String previousConsensus =
+
IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS);
+
SchemaFile sf =
(SchemaFile) SchemaFile.initSchemaFile("root.test.vRoot1",
TEST_SCHEMA_REGION_ID);
IDatabaseMNode<ICachedMNode> newSGNode =
@@ -163,5 +172,9 @@ public class SchemaFileLogTest {
}
Assert.assertEquals(cnt, cnt2);
sf.close();
+
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setSchemaRegionConsensusProtocolClass(previousConsensus);
}
}