This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch mtree_checkpoint
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 83df79b0dcb2b6a98371482be7ec89b09cb1d31b
Author: samperson1997 <[email protected]>
AuthorDate: Tue Jun 9 11:44:48 2020 +0800

    [IOTDB-726] CheckPoint of MTree
---
 .../resources/conf/iotdb-engine.properties         |  3 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  4 +-
 .../org/apache/iotdb/db/metadata/MLogWriter.java   | 57 +++++++++----------
 .../org/apache/iotdb/db/metadata/MManager.java     | 59 +++++++++++++++-----
 .../java/org/apache/iotdb/db/metadata/MTree.java   | 65 ++++++++++++++++++----
 .../apache/iotdb/db/metadata/MetadataConstant.java |  7 ++-
 7 files changed, 154 insertions(+), 60 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index bb78b41..dcaa905 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -204,6 +204,9 @@ tag_attribute_total_size=700
 # if enable partial insert, one measurement failure will not impact other 
measurements
 enable_partial_insert=true
 
+# The interval line numbers of mlog.txt when creating a checkpoint and saving 
snapshot of mtree
+mtree_snapshot_interval=100000
+
 ####################
 ### Memory Control Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 23c111a..6b35b4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -568,12 +568,16 @@ public class IoTDBConfig {
   private int primitiveArraySize = 64;
 
   /**
-   * whether enable data partition
-   * if disabled, all data belongs to partition 0
+   * whether enable data partition if disabled, all data belongs to partition 0
    */
   private boolean enablePartition = false;
 
   /**
+   * Interval line number of mlog.txt when creating a checkpoint and saving 
snapshot of mtree
+   */
+  private int mtreeSnapshotInterval = 10;
+
+  /**
    * Time range for partitioning data inside each storage group, the unit is 
second
    */
   private long partitionInterval = 604800;
@@ -628,6 +632,14 @@ public class IoTDBConfig {
     this.enablePartition = enablePartition;
   }
 
+  public int getMtreeSnapshotInterval() {
+    return mtreeSnapshotInterval;
+  }
+
+  public void setMtreeSnapshotInterval(int mtreeSnapshotInterval) {
+    this.mtreeSnapshotInterval = mtreeSnapshotInterval;
+  }
+
   public long getPartitionInterval() {
     return partitionInterval;
   }
@@ -1211,7 +1223,8 @@ public class IoTDBConfig {
     return allocateMemoryForTimeSeriesMetaDataCache;
   }
 
-  public void setAllocateMemoryForTimeSeriesMetaDataCache(long 
allocateMemoryForTimeSeriesMetaDataCache) {
+  public void setAllocateMemoryForTimeSeriesMetaDataCache(
+      long allocateMemoryForTimeSeriesMetaDataCache) {
     this.allocateMemoryForTimeSeriesMetaDataCache = 
allocateMemoryForTimeSeriesMetaDataCache;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 333b2d8..0ef9293 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -315,6 +315,9 @@ public class IoTDBDescriptor {
           Boolean.parseBoolean(properties.getProperty("enable_partial_insert",
               String.valueOf(conf.isEnablePartialInsert()))));
 
+      conf.setMtreeSnapshotInterval(Integer.parseInt(properties.getProperty(
+          "mtree_snapshot_interval", 
Integer.toString(conf.getMtreeSnapshotInterval()))));
+
       conf.setEnablePerformanceStat(Boolean
           .parseBoolean(properties.getProperty("enable_performance_stat",
               Boolean.toString(conf.isEnablePerformanceStat())).trim()));
@@ -428,7 +431,6 @@ public class IoTDBDescriptor {
       //if using org.apache.iotdb.db.auth.authorizer.OpenIdAuthorizer, 
openID_url is needed.
       conf.setOpenIdProviderUrl(properties.getProperty("openID_url", ""));
 
-
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance().getConfig()
           .setTSFileStorageFs(FSType.valueOf(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
index 72ee54b..4a9e834 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
@@ -35,6 +35,7 @@ public class MLogWriter {
 
   private static final Logger logger = 
LoggerFactory.getLogger(MLogWriter.class);
   private BufferedWriter writer;
+  private int lineNumber;
 
   public MLogWriter(String schemaDir, String logFileName) throws IOException {
     File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
@@ -47,21 +48,18 @@ public class MLogWriter {
     }
 
     File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + 
File.separator + logFileName);
-
-    FileWriter fileWriter;
-    fileWriter = new FileWriter(logFile, true);
+    FileWriter fileWriter = new FileWriter(logFile, true);
     writer = new BufferedWriter(fileWriter);
   }
 
-
   public void close() throws IOException {
     writer.close();
   }
 
-  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws 
IOException {
+  public int createTimeseries(CreateTimeSeriesPlan plan, long offset) throws 
IOException {
     writer.write(String.format("%s,%s,%s,%s,%s", 
MetadataOperationType.CREATE_TIMESERIES,
-        plan.getPath().getFullPath(), plan.getDataType().serialize(), 
plan.getEncoding().serialize(),
-        plan.getCompressor().serialize()));
+        plan.getPath().getFullPath(), plan.getDataType().serialize(),
+        plan.getEncoding().serialize(), plan.getCompressor().serialize()));
 
     writer.write(",");
     if (plan.getProps() != null) {
@@ -86,44 +84,37 @@ public class MLogWriter {
       writer.write(String.valueOf(offset));
     }
 
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
-  public void deleteTimeseries(String path) throws IOException {
+  public int deleteTimeseries(String path) throws IOException {
     writer.write(MetadataOperationType.DELETE_TIMESERIES + "," + path);
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
-  public void setStorageGroup(String storageGroup) throws IOException {
+  public int setStorageGroup(String storageGroup) throws IOException {
     writer.write(MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup);
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
-  public void deleteStorageGroup(String storageGroup) throws IOException {
+  public int deleteStorageGroup(String storageGroup) throws IOException {
     writer.write(MetadataOperationType.DELETE_STORAGE_GROUP + "," + 
storageGroup);
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
-  public void setTTL(String storageGroup, long ttl) throws IOException {
+  public int setTTL(String storageGroup, long ttl) throws IOException {
     writer.write(String.format("%s,%s,%s", MetadataOperationType.SET_TTL, 
storageGroup, ttl));
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
-  public void changeOffset(String path, long offset) throws IOException {
+  public int changeOffset(String path, long offset) throws IOException {
     writer.write(String.format("%s,%s,%s", 
MetadataOperationType.CHANGE_OFFSET, path, offset));
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
-  public void changeAlias(String path, String alias) throws IOException {
+  public int changeAlias(String path, String alias) throws IOException {
     writer.write(String.format("%s,%s,%s", MetadataOperationType.CHANGE_ALIAS, 
path, alias));
-    writer.newLine();
-    writer.flush();
+    return newLine();
   }
 
   public static void upgradeMLog(String schemaDir, String logFileName) throws 
IOException {
@@ -158,7 +149,6 @@ public class MLogWriter {
         writer.write(buf.toString());
         writer.newLine();
         writer.flush();
-        
       }
     }
 
@@ -166,9 +156,16 @@ public class MLogWriter {
     if (!logFile.delete()) {
       throw new IOException("Deleting " + logFile + "failed.");
     }
-    
+
     // rename tmpLogFile to mlog
     FSFactoryProducer.getFSFactory().moveFile(tmpLogFile, logFile);
   }
-  
+
+  private int newLine() throws IOException {
+    writer.newLine();
+    writer.flush();
+
+    // Every MTREE_SNAPSHOT_INTERVAL lines, create a checkpoint and save the 
MTree as a snapshot
+    return lineNumber++;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 7a857a6..2df29d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -83,11 +83,13 @@ public class MManager {
 
   private static final Logger logger = LoggerFactory.getLogger(MManager.class);
   private static final String TIME_SERIES_TREE_HEADER = "===  Timeseries Tree  
===\n\n";
+  private final int MTREE_SNAPSHOT_INTERVAL;
 
   // the lock for read/insert
   private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   // the log file seriesPath
   private String logFilePath;
+  private String mtreeSnapshotPath;
   private MTree mtree;
   private MLogWriter logWriter;
   private TagLogFile tagLogFile;
@@ -117,6 +119,7 @@ public class MManager {
 
   private MManager() {
     config = IoTDBDescriptor.getInstance().getConfig();
+    MTREE_SNAPSHOT_INTERVAL = config.getMtreeSnapshotInterval();
     String schemaDir = config.getSchemaDir();
     File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
     if (!schemaFolder.exists()) {
@@ -127,6 +130,7 @@ public class MManager {
       }
     }
     logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
+    mtreeSnapshotPath = schemaDir + File.separator + 
MetadataConstant.MTREE_SNAPSHOT;
 
     // do not write log when recover
     isRecovering = true;
@@ -199,11 +203,16 @@ public class MManager {
 
   private void initFromLog(File logFile) throws IOException {
     // init the metadata from the operation log
-    mtree = new MTree();
+    mtree = MTree.deserializeFrom(mtreeSnapshotPath);
     if (logFile.exists()) {
       try (FileReader fr = new FileReader(logFile);
           BufferedReader br = new BufferedReader(fr)) {
         String cmd;
+        int idx = 0;
+        while (idx < mtree.getSnapshotlineNumber()) {
+          br.readLine();
+          idx++;
+        }
         while ((cmd = br.readLine()) != null) {
           try {
             operation(cmd);
@@ -358,7 +367,10 @@ public class MManager {
             || (plan.getAttributes() != null && 
!plan.getAttributes().isEmpty())) {
           offset = tagLogFile.write(plan.getTags(), plan.getAttributes());
         }
-        logWriter.createTimeseries(plan, offset);
+        int logLineNumber = logWriter.createTimeseries(plan, offset);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
       }
       leafMNode.setOffset(offset);
 
@@ -425,7 +437,10 @@ public class MManager {
             if (emptyStorageGroup != null) {
               
StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(emptyStorageGroup);
             }
-            logWriter.deleteTimeseries(p);
+            int logLineNumber = logWriter.deleteTimeseries(p);
+            if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+              mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+            }
           }
         } catch (DeleteFailedException e) {
           failedNames.add(e.getName());
@@ -441,9 +456,6 @@ public class MManager {
 
   /**
    * remove the node from the tag inverted index
-   *
-   * @param node
-   * @throws IOException
    */
   private void removeFromTagInvertedIndex(LeafMNode node) throws IOException {
     if (node.getOffset() < 0) {
@@ -525,7 +537,10 @@ public class MManager {
         seriesNumberInStorageGroups.put(storageGroup, 0);
       }
       if (!isRecovering) {
-        logWriter.setStorageGroup(storageGroup);
+        int logLineNumber = logWriter.setStorageGroup(storageGroup);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
       }
     } catch (IOException e) {
       throw new MetadataException(e.getMessage());
@@ -569,7 +584,10 @@ public class MManager {
         }
         // if success
         if (!isRecovering) {
-          logWriter.deleteStorageGroup(storageGroup);
+          int logLineNumber = logWriter.deleteStorageGroup(storageGroup);
+          if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+            mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+          }
         }
       }
     } catch (ConfigAdjusterException e) {
@@ -1078,7 +1096,10 @@ public class MManager {
     try {
       getStorageGroupNode(storageGroup).setDataTTL(dataTTL);
       if (!isRecovering) {
-        logWriter.setTTL(storageGroup, dataTTL);
+        int logLineNumber = logWriter.setTTL(storageGroup, dataTTL);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
       }
     } finally {
       lock.writeLock().unlock();
@@ -1164,7 +1185,10 @@ public class MManager {
         leafMNode.getParent().addAlias(alias, leafMNode);
         leafMNode.setAlias(alias);
         // persist to WAL
-        logWriter.changeAlias(fullPath, alias);
+        int logLineNumber = logWriter.changeAlias(fullPath, alias);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
       }
 
       if (tagsMap == null && attributesMap == null) {
@@ -1173,7 +1197,10 @@ public class MManager {
       // no tag or attribute, we need to add a new record in log
       if (leafMNode.getOffset() < 0) {
         long offset = tagLogFile.write(tagsMap, attributesMap);
-        logWriter.changeOffset(fullPath, offset);
+        int logLineNumber = logWriter.changeOffset(fullPath, offset);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
         leafMNode.setOffset(offset);
         // update inverted Index map
         if (tagsMap != null) {
@@ -1242,7 +1269,10 @@ public class MManager {
       // no tag or attribute, we need to add a new record in log
       if (leafMNode.getOffset() < 0) {
         long offset = tagLogFile.write(Collections.emptyMap(), attributesMap);
-        logWriter.changeOffset(fullPath, offset);
+        int logLineNumber = logWriter.changeOffset(fullPath, offset);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
         leafMNode.setOffset(offset);
         return;
       }
@@ -1285,7 +1315,10 @@ public class MManager {
       // no tag or attribute, we need to add a new record in log
       if (leafMNode.getOffset() < 0) {
         long offset = tagLogFile.write(tagsMap, Collections.emptyMap());
-        logWriter.changeOffset(fullPath, offset);
+        int logLineNumber = logWriter.changeOffset(fullPath, offset);
+        if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) {
+          mtree.serializeTo(mtreeSnapshotPath, logLineNumber);
+        }
         leafMNode.setOffset(offset);
         // update inverted Index map
         for (Entry<String, String> entry : tagsMap.entrySet()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 6b944ec..b2ff6f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -24,6 +24,12 @@ import static 
org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -40,6 +46,7 @@ import java.util.TreeSet;
 import java.util.regex.Pattern;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -59,6 +66,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The hierarchical struct of the Metadata Tree is implemented in this class.
@@ -66,12 +75,15 @@ import 
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public class MTree implements Serializable {
 
   private static final long serialVersionUID = -4200394435237291964L;
+  private static final Logger logger = LoggerFactory.getLogger(MTree.class);
+
   private MNode root;
+  private int snapshotlineNumber;
 
-  private transient ThreadLocal<Integer> limit = new ThreadLocal<>();
-  private transient ThreadLocal<Integer> offset = new ThreadLocal<>();
-  private transient ThreadLocal<Integer> count = new ThreadLocal<>();
-  private transient ThreadLocal<Integer> curOffset = new ThreadLocal<>();
+  private transient static ThreadLocal<Integer> limit = new ThreadLocal<>();
+  private transient static ThreadLocal<Integer> offset = new ThreadLocal<>();
+  private transient static ThreadLocal<Integer> count = new ThreadLocal<>();
+  private transient static ThreadLocal<Integer> curOffset = new 
ThreadLocal<>();
 
   MTree() {
     this.root = new InternalMNode(null, IoTDBConstant.PATH_ROOT);
@@ -368,13 +380,6 @@ public class MTree implements Serializable {
   }
 
   /**
-   * Get device node, if the give path is not a device, throw exception
-   */
-  MNode getDeviceNode(String path) throws MetadataException {
-    return getNodeByPath(path);
-  }
-
-  /**
    * Get node by the path
    *
    * @return last node in given seriesPath
@@ -612,6 +617,7 @@ public class MTree implements Serializable {
 
   /**
    * Traverse the MTree to get the count of timeseries in the given level.
+   *
    * @param targetLevel Record the distance to the target level, 0 means the 
target level.
    */
   private int getCountInGivenLevel(MNode node, int targetLevel) {
@@ -847,6 +853,7 @@ public class MTree implements Serializable {
 
   /**
    * Get all paths under the given level.
+   *
    * @param targetLevel Record the distance to the target level, 0 means the 
target level.
    */
   private void findNodes(MNode node, String path, List<String> res, int 
targetLevel) {
@@ -864,6 +871,42 @@ public class MTree implements Serializable {
     }
   }
 
+  public int getSnapshotlineNumber() {
+    return snapshotlineNumber;
+  }
+
+  public void setSnapshotlineNumber(int snapshotlineNumber) {
+    this.snapshotlineNumber = snapshotlineNumber;
+  }
+
+  public void serializeTo(String snapshotPath, int lineNumber) throws 
IOException {
+    this.setSnapshotlineNumber(lineNumber);
+    ObjectOutputStream os = new ObjectOutputStream(
+        new 
FileOutputStream(SystemFileFactory.INSTANCE.getFile(snapshotPath)));
+    os.writeObject(this);
+    os.close();
+  }
+
+  public static MTree deserializeFrom(String mtreeSnapshotPath) throws 
IOException {
+    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
+    if (!mtreeSnapshot.exists()) {
+      return new MTree();
+    }
+    ObjectInputStream oi = new ObjectInputStream(new 
FileInputStream(mtreeSnapshot));
+    MTree mtree = null;
+    try {
+      mtree = (MTree) oi.readObject();
+    } catch (ClassNotFoundException e) {
+      logger.error("Failed to deserialize MTree from mtree.snapshot. ", e);
+    }
+    oi.close();
+    limit = new ThreadLocal<>();
+    offset = new ThreadLocal<>();
+    count = new ThreadLocal<>();
+    curOffset = new ThreadLocal<>();
+    return mtree;
+  }
+
   @Override
   public String toString() {
     JSONObject jsonObject = new JSONObject();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java
index ee096bf..d7d8b5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.metadata;
 
 public class MetadataConstant {
-  private MetadataConstant(){
-    //allowed to do nothing
+
+  private MetadataConstant() {
+    // allowed to do nothing
   }
+
   public static final String ROOT = "root";
   public static final String METADATA_LOG = "mlog.txt";
   public static final String TAG_LOG = "tlog.txt";
+  public static final String MTREE_SNAPSHOT = "mtree.snapshot";
 }

Reply via email to