This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ee93ad4145 [IOTDB-4960]SchemaFile implements separate FileChannel for
read/write operation (#8813)
ee93ad4145 is described below
commit ee93ad4145cf2cd5785b314a8ea334bca502c2f3
Author: ZhaoXin <[email protected]>
AuthorDate: Wed Jan 11 19:01:38 2023 +0800
[IOTDB-4960]SchemaFile implements separate FileChannel for read/write
operation (#8813)
---
.../mtree/store/disk/schemafile/SchemaFile.java | 21 ++++++++++++++-------
.../disk/schemafile/pagemgr/BTreePageManager.java | 5 +++--
.../store/disk/schemafile/pagemgr/PageManager.java | 17 ++++++++++++++---
3 files changed, 31 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
index b508dac0d0..e837b06269 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
@@ -77,6 +77,7 @@ public class SchemaFile implements ISchemaFile {
private File pmtFile;
private FileChannel channel;
+ // todo refactor constructor for schema file in Jan.
private SchemaFile(
String sgName, int schemaRegionId, boolean override, long ttl, boolean
isEntity)
throws IOException, MetadataException {
@@ -186,7 +187,7 @@ public class SchemaFile implements ISchemaFile {
this.dataTTL = sgNode.getDataTTL();
this.isEntity = sgNode.isEntity();
this.sgNodeTemplateIdWithState = sgNode.getSchemaTemplateIdWithState();
- updateHeader();
+ updateHeaderBuffer();
return true;
}
@@ -220,6 +221,7 @@ public class SchemaFile implements ISchemaFile {
pageManager.writeNewChildren(node);
pageManager.writeUpdatedChildren(node);
pageManager.flushDirtyPages();
+ updateHeaderBuffer();
}
@Override
@@ -240,16 +242,18 @@ public class SchemaFile implements ISchemaFile {
@Override
public void close() throws IOException {
- updateHeader();
+ updateHeaderBuffer();
pageManager.flushDirtyPages();
pageManager.close();
+ forceChannel();
channel.close();
}
@Override
public void sync() throws IOException {
- updateHeader();
+ updateHeaderBuffer();
pageManager.flushDirtyPages();
+ forceChannel();
}
@Override
@@ -327,7 +331,7 @@ public class SchemaFile implements ISchemaFile {
ReadWriteIOUtils.write(sgNodeTemplateIdWithState, headerContent);
ReadWriteIOUtils.write(SchemaFileConfig.SCHEMA_FILE_VERSION,
headerContent);
lastSGAddr = 0L;
- pageManager = new BTreePageManager(channel, -1, logPath);
+ pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
} else {
channel.read(headerContent);
headerContent.clear();
@@ -342,11 +346,11 @@ public class SchemaFile implements ISchemaFile {
throw new MetadataException("SchemaFile with wrong version, please
check or upgrade.");
}
- pageManager = new BTreePageManager(channel, lastPageIndex, logPath);
+ pageManager = new BTreePageManager(channel, pmtFile, lastPageIndex,
logPath);
}
}
- private void updateHeader() throws IOException {
+ private void updateHeaderBuffer() throws IOException {
headerContent.clear();
ReadWriteIOUtils.write(pageManager.getLastPageIndex(), headerContent);
@@ -356,8 +360,11 @@ public class SchemaFile implements ISchemaFile {
ReadWriteIOUtils.write(lastSGAddr, headerContent);
ReadWriteIOUtils.write(SchemaFileConfig.SCHEMA_FILE_VERSION,
headerContent);
- headerContent.clear();
+ headerContent.flip();
channel.write(headerContent, 0);
+ }
+
+ private void forceChannel() throws IOException {
channel.force(true);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
index a1e4e89cb7..7474e2313a 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISchemaPage;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISegmentedPage;
import
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -40,9 +41,9 @@ import static
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFil
public class BTreePageManager extends PageManager {
- public BTreePageManager(FileChannel channel, int lastPageIndex, String
logPath)
+ public BTreePageManager(FileChannel channel, File pmtFile, int
lastPageIndex, String logPath)
throws IOException, MetadataException {
- super(channel, lastPageIndex, logPath);
+ super(channel, pmtFile, lastPageIndex, logPath);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
index 786a5e796a..3dee4a1111 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
@@ -35,11 +35,13 @@ import
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.log.SchemaFileLo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -89,10 +91,14 @@ public abstract class PageManager implements IPageManager {
private final FileChannel channel;
+ // handle timeout interruption during reading
+ private File pmtFile;
+ private FileChannel readChannel;
+
private final AtomicInteger logCounter;
private SchemaFileLogWriter logWriter;
- PageManager(FileChannel channel, int lastPageIndex, String logPath)
+ PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String
logPath)
throws IOException, MetadataException {
this.pageInstCache = Collections.synchronizedMap(new
LinkedHashMap<>(PAGE_CACHE_SIZE, 1, true));
this.dirtyPages = new ConcurrentHashMap<>();
@@ -102,6 +108,8 @@ public abstract class PageManager implements IPageManager {
lastPageIndex >= 0 ? new AtomicInteger(lastPageIndex) : new
AtomicInteger(0);
this.treeTrace = new int[16];
this.channel = channel;
+ this.pmtFile = pmtFile;
+ this.readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
// recover if log exists
int pageAcc = (int) recoverFromLog(logPath) / PAGE_LENGTH;
@@ -555,9 +563,12 @@ public abstract class PageManager implements IPageManager {
return page;
}
- private int loadFromFile(ByteBuffer dst, int pageIndex) throws IOException {
+ private synchronized int loadFromFile(ByteBuffer dst, int pageIndex) throws
IOException {
dst.clear();
- return channel.read(dst, getPageAddress(pageIndex));
+ if (!readChannel.isOpen()) {
+ readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
+ }
+ return readChannel.read(dst, getPageAddress(pageIndex));
}
private void updateParentalRecord(IMNode parent, String key, long newSegAddr)