This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ee93ad4145 [IOTDB-4960]SchemaFile implements separate FileChannel for 
read/write operation (#8813)
ee93ad4145 is described below

commit ee93ad4145cf2cd5785b314a8ea334bca502c2f3
Author: ZhaoXin <[email protected]>
AuthorDate: Wed Jan 11 19:01:38 2023 +0800

    [IOTDB-4960]SchemaFile implements separate FileChannel for read/write 
operation (#8813)
---
 .../mtree/store/disk/schemafile/SchemaFile.java     | 21 ++++++++++++++-------
 .../disk/schemafile/pagemgr/BTreePageManager.java   |  5 +++--
 .../store/disk/schemafile/pagemgr/PageManager.java  | 17 ++++++++++++++---
 3 files changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
index b508dac0d0..e837b06269 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
@@ -77,6 +77,7 @@ public class SchemaFile implements ISchemaFile {
   private File pmtFile;
   private FileChannel channel;
 
+  // todo refactor constructor for schema file in Jan.
   private SchemaFile(
       String sgName, int schemaRegionId, boolean override, long ttl, boolean 
isEntity)
       throws IOException, MetadataException {
@@ -186,7 +187,7 @@ public class SchemaFile implements ISchemaFile {
     this.dataTTL = sgNode.getDataTTL();
     this.isEntity = sgNode.isEntity();
     this.sgNodeTemplateIdWithState = sgNode.getSchemaTemplateIdWithState();
-    updateHeader();
+    updateHeaderBuffer();
     return true;
   }
 
@@ -220,6 +221,7 @@ public class SchemaFile implements ISchemaFile {
     pageManager.writeNewChildren(node);
     pageManager.writeUpdatedChildren(node);
     pageManager.flushDirtyPages();
+    updateHeaderBuffer();
   }
 
   @Override
@@ -240,16 +242,18 @@ public class SchemaFile implements ISchemaFile {
 
   @Override
   public void close() throws IOException {
-    updateHeader();
+    updateHeaderBuffer();
     pageManager.flushDirtyPages();
     pageManager.close();
+    forceChannel();
     channel.close();
   }
 
   @Override
   public void sync() throws IOException {
-    updateHeader();
+    updateHeaderBuffer();
     pageManager.flushDirtyPages();
+    forceChannel();
   }
 
   @Override
@@ -327,7 +331,7 @@ public class SchemaFile implements ISchemaFile {
       ReadWriteIOUtils.write(sgNodeTemplateIdWithState, headerContent);
       ReadWriteIOUtils.write(SchemaFileConfig.SCHEMA_FILE_VERSION, 
headerContent);
       lastSGAddr = 0L;
-      pageManager = new BTreePageManager(channel, -1, logPath);
+      pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
     } else {
       channel.read(headerContent);
       headerContent.clear();
@@ -342,11 +346,11 @@ public class SchemaFile implements ISchemaFile {
         throw new MetadataException("SchemaFile with wrong version, please 
check or upgrade.");
       }
 
-      pageManager = new BTreePageManager(channel, lastPageIndex, logPath);
+      pageManager = new BTreePageManager(channel, pmtFile, lastPageIndex, 
logPath);
     }
   }
 
-  private void updateHeader() throws IOException {
+  private void updateHeaderBuffer() throws IOException {
     headerContent.clear();
 
     ReadWriteIOUtils.write(pageManager.getLastPageIndex(), headerContent);
@@ -356,8 +360,11 @@ public class SchemaFile implements ISchemaFile {
     ReadWriteIOUtils.write(lastSGAddr, headerContent);
     ReadWriteIOUtils.write(SchemaFileConfig.SCHEMA_FILE_VERSION, 
headerContent);
 
-    headerContent.clear();
+    headerContent.flip();
     channel.write(headerContent, 0);
+  }
+
+  private void forceChannel() throws IOException {
     channel.force(true);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
index a1e4e89cb7..7474e2313a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/BTreePageManager.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISchemaPage;
 import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISegmentedPage;
 import 
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFileConfig;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -40,9 +41,9 @@ import static 
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFil
 
 public class BTreePageManager extends PageManager {
 
-  public BTreePageManager(FileChannel channel, int lastPageIndex, String 
logPath)
+  public BTreePageManager(FileChannel channel, File pmtFile, int 
lastPageIndex, String logPath)
       throws IOException, MetadataException {
-    super(channel, lastPageIndex, logPath);
+    super(channel, pmtFile, lastPageIndex, logPath);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
index 786a5e796a..3dee4a1111 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
@@ -35,11 +35,13 @@ import 
org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.log.SchemaFileLo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -89,10 +91,14 @@ public abstract class PageManager implements IPageManager {
 
   private final FileChannel channel;
 
+  // handle timeout interruption during reading
+  private File pmtFile;
+  private FileChannel readChannel;
+
   private final AtomicInteger logCounter;
   private SchemaFileLogWriter logWriter;
 
-  PageManager(FileChannel channel, int lastPageIndex, String logPath)
+  PageManager(FileChannel channel, File pmtFile, int lastPageIndex, String 
logPath)
       throws IOException, MetadataException {
     this.pageInstCache = Collections.synchronizedMap(new 
LinkedHashMap<>(PAGE_CACHE_SIZE, 1, true));
     this.dirtyPages = new ConcurrentHashMap<>();
@@ -102,6 +108,8 @@ public abstract class PageManager implements IPageManager {
         lastPageIndex >= 0 ? new AtomicInteger(lastPageIndex) : new 
AtomicInteger(0);
     this.treeTrace = new int[16];
     this.channel = channel;
+    this.pmtFile = pmtFile;
+    this.readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
 
     // recover if log exists
     int pageAcc = (int) recoverFromLog(logPath) / PAGE_LENGTH;
@@ -555,9 +563,12 @@ public abstract class PageManager implements IPageManager {
     return page;
   }
 
-  private int loadFromFile(ByteBuffer dst, int pageIndex) throws IOException {
+  private synchronized int loadFromFile(ByteBuffer dst, int pageIndex) throws 
IOException {
     dst.clear();
-    return channel.read(dst, getPageAddress(pageIndex));
+    if (!readChannel.isOpen()) {
+      readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
+    }
+    return readChannel.read(dst, getPageAddress(pageIndex));
   }
 
   private void updateParentalRecord(IMNode parent, String key, long newSegAddr)

Reply via email to