This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch mtree_checkpoint in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 83df79b0dcb2b6a98371482be7ec89b09cb1d31b Author: samperson1997 <[email protected]> AuthorDate: Tue Jun 9 11:44:48 2020 +0800 [IOTDB-726] CheckPoint of MTree --- .../resources/conf/iotdb-engine.properties | 3 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++++++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +- .../org/apache/iotdb/db/metadata/MLogWriter.java | 57 +++++++++---------- .../org/apache/iotdb/db/metadata/MManager.java | 59 +++++++++++++++----- .../java/org/apache/iotdb/db/metadata/MTree.java | 65 ++++++++++++++++++---- .../apache/iotdb/db/metadata/MetadataConstant.java | 7 ++- 7 files changed, 154 insertions(+), 60 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index bb78b41..dcaa905 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -204,6 +204,9 @@ tag_attribute_total_size=700 # if enable partial insert, one measurement failure will not impact other measurements enable_partial_insert=true +# The interval line numbers of mlog.txt when creating a checkpoint and saving snapshot of mtree +mtree_snapshot_interval=100000 + #################### ### Memory Control Configuration #################### diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 23c111a..6b35b4d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -568,12 +568,16 @@ public class IoTDBConfig { private int primitiveArraySize = 64; /** - * whether enable data partition - * if disabled, all data belongs to partition 0 + * whether enable data partition if disabled, all data belongs to partition 0 */ private boolean enablePartition = false; /** + * Interval line number of mlog.txt when creating a checkpoint and saving snapshot of mtree + */ + private int mtreeSnapshotInterval = 10; + + /** * Time range for partitioning data inside each storage group, the unit is second */ private long partitionInterval = 604800; @@ -628,6 +632,14 @@ public class IoTDBConfig { this.enablePartition = enablePartition; } + public int getMtreeSnapshotInterval() { + return mtreeSnapshotInterval; + } + + public void setMtreeSnapshotInterval(int mtreeSnapshotInterval) { + this.mtreeSnapshotInterval = mtreeSnapshotInterval; + } + public long getPartitionInterval() { return partitionInterval; } @@ -1211,7 +1223,8 @@ public class IoTDBConfig { return allocateMemoryForTimeSeriesMetaDataCache; } - public void setAllocateMemoryForTimeSeriesMetaDataCache(long allocateMemoryForTimeSeriesMetaDataCache) { + public void setAllocateMemoryForTimeSeriesMetaDataCache( + long allocateMemoryForTimeSeriesMetaDataCache) { this.allocateMemoryForTimeSeriesMetaDataCache = allocateMemoryForTimeSeriesMetaDataCache; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 333b2d8..0ef9293 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -315,6 +315,9 @@ public class IoTDBDescriptor { Boolean.parseBoolean(properties.getProperty("enable_partial_insert", String.valueOf(conf.isEnablePartialInsert())))); + conf.setMtreeSnapshotInterval(Integer.parseInt(properties.getProperty( + "mtree_snapshot_interval", Integer.toString(conf.getMtreeSnapshotInterval())))); + conf.setEnablePerformanceStat(Boolean .parseBoolean(properties.getProperty("enable_performance_stat", Boolean.toString(conf.isEnablePerformanceStat())).trim())); @@ -428,7 +431,6 @@ public class IoTDBDescriptor { //if using org.apache.iotdb.db.auth.authorizer.OpenIdAuthorizer, openID_url is needed. conf.setOpenIdProviderUrl(properties.getProperty("openID_url", "")); - // At the same time, set TSFileConfig TSFileDescriptor.getInstance().getConfig() .setTSFileStorageFs(FSType.valueOf( diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java index 72ee54b..4a9e834 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java @@ -35,6 +35,7 @@ public class MLogWriter { private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class); private BufferedWriter writer; + private int lineNumber; public MLogWriter(String schemaDir, String logFileName) throws IOException { File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir); @@ -47,21 +48,18 @@ public class MLogWriter { } File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName); - - FileWriter fileWriter; - fileWriter = new FileWriter(logFile, true); + FileWriter fileWriter = new FileWriter(logFile, true); writer = new BufferedWriter(fileWriter); } - public void close() throws IOException { writer.close(); } - public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException { + public int createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException { writer.write(String.format("%s,%s,%s,%s,%s", MetadataOperationType.CREATE_TIMESERIES, - plan.getPath().getFullPath(), plan.getDataType().serialize(), plan.getEncoding().serialize(), - plan.getCompressor().serialize())); + plan.getPath().getFullPath(), plan.getDataType().serialize(), + plan.getEncoding().serialize(), plan.getCompressor().serialize())); writer.write(","); if (plan.getProps() != null) { @@ -86,44 +84,37 @@ public class MLogWriter { writer.write(String.valueOf(offset)); } - writer.newLine(); - writer.flush(); + return newLine(); } - public void deleteTimeseries(String path) throws IOException { + public int deleteTimeseries(String path) throws IOException { writer.write(MetadataOperationType.DELETE_TIMESERIES + "," + path); - writer.newLine(); - writer.flush(); + return newLine(); } - public void setStorageGroup(String storageGroup) throws IOException { + public int setStorageGroup(String storageGroup) throws IOException { writer.write(MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup); - writer.newLine(); - writer.flush(); + return newLine(); } - public void deleteStorageGroup(String storageGroup) throws IOException { + public int deleteStorageGroup(String storageGroup) throws IOException { writer.write(MetadataOperationType.DELETE_STORAGE_GROUP + "," + storageGroup); - writer.newLine(); - writer.flush(); + return newLine(); } - public void setTTL(String storageGroup, long ttl) throws IOException { + public int setTTL(String storageGroup, long ttl) throws IOException { writer.write(String.format("%s,%s,%s", MetadataOperationType.SET_TTL, storageGroup, ttl)); - writer.newLine(); - writer.flush(); + return newLine(); } - public void changeOffset(String path, long offset) throws IOException { + public int changeOffset(String path, long offset) throws IOException { writer.write(String.format("%s,%s,%s", MetadataOperationType.CHANGE_OFFSET, path, offset)); - writer.newLine(); - writer.flush(); + return newLine(); } - public void changeAlias(String path, String alias) throws IOException { + public int changeAlias(String path, String alias) throws IOException { writer.write(String.format("%s,%s,%s", MetadataOperationType.CHANGE_ALIAS, path, alias)); - writer.newLine(); - writer.flush(); + return newLine(); } public static void upgradeMLog(String schemaDir, String logFileName) throws IOException { @@ -158,7 +149,6 @@ public class MLogWriter { writer.write(buf.toString()); writer.newLine(); writer.flush(); - } } @@ -166,9 +156,16 @@ public class MLogWriter { if (!logFile.delete()) { throw new IOException("Deleting " + logFile + "failed."); } - + // rename tmpLogFile to mlog FSFactoryProducer.getFSFactory().moveFile(tmpLogFile, logFile); } - + + private int newLine() throws IOException { + writer.newLine(); + writer.flush(); + + // Every MTREE_SNAPSHOT_INTERVAL lines, create a checkpoint and save the MTree as a snapshot + return lineNumber++; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 7a857a6..2df29d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -83,11 +83,13 @@ public class MManager { private static final Logger logger = LoggerFactory.getLogger(MManager.class); private static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n"; + private final int MTREE_SNAPSHOT_INTERVAL; // the lock for read/insert private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // the log file seriesPath private String logFilePath; + private String mtreeSnapshotPath; private MTree mtree; private MLogWriter logWriter; private TagLogFile tagLogFile; @@ -117,6 +119,7 @@ public class MManager { private MManager() { config = IoTDBDescriptor.getInstance().getConfig(); + MTREE_SNAPSHOT_INTERVAL = config.getMtreeSnapshotInterval(); String schemaDir = config.getSchemaDir(); File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir); if (!schemaFolder.exists()) { @@ -127,6 +130,7 @@ public class MManager { } } logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG; + mtreeSnapshotPath = schemaDir + File.separator + MetadataConstant.MTREE_SNAPSHOT; // do not write log when recover isRecovering = true; @@ -199,11 +203,16 @@ public class MManager { private void initFromLog(File logFile) throws IOException { // init the metadata from the operation log - mtree = new MTree(); + mtree = MTree.deserializeFrom(mtreeSnapshotPath); if (logFile.exists()) { try (FileReader fr = new FileReader(logFile); BufferedReader br = new BufferedReader(fr)) { String cmd; + int idx = 0; + while (idx < mtree.getSnapshotlineNumber()) { + br.readLine(); + idx++; + } while ((cmd = br.readLine()) != null) { try { operation(cmd); @@ -358,7 +367,10 @@ public class MManager { || (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) { offset = tagLogFile.write(plan.getTags(), plan.getAttributes()); } - logWriter.createTimeseries(plan, offset); + int logLineNumber = logWriter.createTimeseries(plan, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } leafMNode.setOffset(offset); @@ -425,7 +437,10 @@ public class MManager { if (emptyStorageGroup != null) { StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(emptyStorageGroup); } - logWriter.deleteTimeseries(p); + int logLineNumber = logWriter.deleteTimeseries(p); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } catch (DeleteFailedException e) { failedNames.add(e.getName()); @@ -441,9 +456,6 @@ public class MManager { /** * remove the node from the tag inverted index - * - * @param node - * @throws IOException */ private void removeFromTagInvertedIndex(LeafMNode node) throws IOException { if (node.getOffset() < 0) { @@ -525,7 +537,10 @@ public class MManager { seriesNumberInStorageGroups.put(storageGroup, 0); } if (!isRecovering) { - logWriter.setStorageGroup(storageGroup); + int logLineNumber = logWriter.setStorageGroup(storageGroup); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } catch (IOException e) { throw new MetadataException(e.getMessage()); @@ -569,7 +584,10 @@ public class MManager { } // if success if (!isRecovering) { - logWriter.deleteStorageGroup(storageGroup); + int logLineNumber = logWriter.deleteStorageGroup(storageGroup); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } } catch (ConfigAdjusterException e) { @@ -1078,7 +1096,10 @@ public class MManager { try { getStorageGroupNode(storageGroup).setDataTTL(dataTTL); if (!isRecovering) { - logWriter.setTTL(storageGroup, dataTTL); + int logLineNumber = logWriter.setTTL(storageGroup, dataTTL); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } finally { lock.writeLock().unlock(); @@ -1164,7 +1185,10 @@ public class MManager { leafMNode.getParent().addAlias(alias, leafMNode); leafMNode.setAlias(alias); // persist to WAL - logWriter.changeAlias(fullPath, alias); + int logLineNumber = logWriter.changeAlias(fullPath, alias); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } if (tagsMap == null && attributesMap == null) { @@ -1173,7 +1197,10 @@ public class MManager { // no tag or attribute, we need to add a new record in log if (leafMNode.getOffset() < 0) { long offset = tagLogFile.write(tagsMap, attributesMap); - logWriter.changeOffset(fullPath, offset); + int logLineNumber = logWriter.changeOffset(fullPath, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } leafMNode.setOffset(offset); // update inverted Index map if (tagsMap != null) { @@ -1242,7 +1269,10 @@ public class MManager { // no tag or attribute, we need to add a new record in log if (leafMNode.getOffset() < 0) { long offset = tagLogFile.write(Collections.emptyMap(), attributesMap); - logWriter.changeOffset(fullPath, offset); + int logLineNumber = logWriter.changeOffset(fullPath, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } leafMNode.setOffset(offset); return; } @@ -1285,7 +1315,10 @@ public class MManager { // no tag or attribute, we need to add a new record in log if (leafMNode.getOffset() < 0) { long offset = tagLogFile.write(tagsMap, Collections.emptyMap()); - logWriter.changeOffset(fullPath, offset); + int logLineNumber = logWriter.changeOffset(fullPath, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } leafMNode.setOffset(offset); // update inverted Index map for (Entry<String, String> entry : tagsMap.entrySet()) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 6b944ec..b2ff6f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -24,6 +24,12 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayDeque; import java.util.ArrayList; @@ -40,6 +46,7 @@ import java.util.TreeSet; import java.util.regex.Pattern; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -59,6 +66,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The hierarchical struct of the Metadata Tree is implemented in this class. @@ -66,12 +75,15 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public class MTree implements Serializable { private static final long serialVersionUID = -4200394435237291964L; + private static final Logger logger = LoggerFactory.getLogger(MTree.class); + private MNode root; + private int snapshotlineNumber; - private transient ThreadLocal<Integer> limit = new ThreadLocal<>(); - private transient ThreadLocal<Integer> offset = new ThreadLocal<>(); - private transient ThreadLocal<Integer> count = new ThreadLocal<>(); - private transient ThreadLocal<Integer> curOffset = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> limit = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> offset = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> count = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> curOffset = new ThreadLocal<>(); MTree() { this.root = new InternalMNode(null, IoTDBConstant.PATH_ROOT); @@ -368,13 +380,6 @@ public class MTree implements Serializable { } /** - * Get device node, if the give path is not a device, throw exception - */ - MNode getDeviceNode(String path) throws MetadataException { - return getNodeByPath(path); - } - - /** * Get node by the path * * @return last node in given seriesPath @@ -612,6 +617,7 @@ public class MTree implements Serializable { /** * Traverse the MTree to get the count of timeseries in the given level. + * * @param targetLevel Record the distance to the target level, 0 means the target level. */ private int getCountInGivenLevel(MNode node, int targetLevel) { @@ -847,6 +853,7 @@ public class MTree implements Serializable { /** * Get all paths under the given level. + * * @param targetLevel Record the distance to the target level, 0 means the target level. */ private void findNodes(MNode node, String path, List<String> res, int targetLevel) { @@ -864,6 +871,42 @@ public class MTree implements Serializable { } } + public int getSnapshotlineNumber() { + return snapshotlineNumber; + } + + public void setSnapshotlineNumber(int snapshotlineNumber) { + this.snapshotlineNumber = snapshotlineNumber; + } + + public void serializeTo(String snapshotPath, int lineNumber) throws IOException { + this.setSnapshotlineNumber(lineNumber); + ObjectOutputStream os = new ObjectOutputStream( + new FileOutputStream(SystemFileFactory.INSTANCE.getFile(snapshotPath))); + os.writeObject(this); + os.close(); + } + + public static MTree deserializeFrom(String mtreeSnapshotPath) throws IOException { + File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath); + if (!mtreeSnapshot.exists()) { + return new MTree(); + } + ObjectInputStream oi = new ObjectInputStream(new FileInputStream(mtreeSnapshot)); + MTree mtree = null; + try { + mtree = (MTree) oi.readObject(); + } catch (ClassNotFoundException e) { + logger.error("Failed to deserialize MTree from mtree.snapshot. ", e); + } + oi.close(); + limit = new ThreadLocal<>(); + offset = new ThreadLocal<>(); + count = new ThreadLocal<>(); + curOffset = new ThreadLocal<>(); + return mtree; + } + @Override public String toString() { JSONObject jsonObject = new JSONObject(); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java index ee096bf..d7d8b5b 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.metadata; public class MetadataConstant { - private MetadataConstant(){ - //allowed to do nothing + + private MetadataConstant() { + // allowed to do nothing } + public static final String ROOT = "root"; public static final String METADATA_LOG = "mlog.txt"; public static final String TAG_LOG = "tlog.txt"; + public static final String MTREE_SNAPSHOT = "mtree.snapshot"; }
