This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch mtree_checkpoint in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 02369679bcf8cfa70720db72e9d36a071cf5bf09 Author: samperson1997 <[email protected]> AuthorDate: Tue Jun 16 23:24:25 2020 +0800 [IOTDB-726] CheckPoint of MTree --- .../resources/conf/iotdb-engine.properties | 3 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +- .../org/apache/iotdb/db/metadata/MLogWriter.java | 57 +++++---- .../org/apache/iotdb/db/metadata/MManager.java | 127 ++++++++++++++------- .../java/org/apache/iotdb/db/metadata/MTree.java | 88 +++++++++++++- .../org/apache/iotdb/db/metadata/mnode/MNode.java | 2 +- 7 files changed, 220 insertions(+), 80 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 846228c..c0d21f6 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -204,6 +204,9 @@ tag_attribute_total_size=700 # if enable partial insert, one measurement failure will not impact other measurements enable_partial_insert=true +# The interval line numbers of mlog.txt when creating a checkpoint and saving snapshot of mtree +mtree_snapshot_interval=100000 + #################### ### Memory Control Configuration #################### diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 23c111a..2b96a2d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -568,12 +568,16 @@ public class IoTDBConfig { private int primitiveArraySize = 64; /** - * whether enable data partition - * if disabled, all data belongs to partition 0 + * whether enable data partition if disabled, all data belongs to partition 0 */ private boolean enablePartition = false; /** + * Interval line number of mlog.txt when creating a checkpoint and saving snapshot of mtree + */ + private int mtreeSnapshotInterval = 100000; + + /** * Time range for partitioning data inside each storage group, the unit is second */ private long partitionInterval = 604800; @@ -628,6 +632,14 @@ public class IoTDBConfig { this.enablePartition = enablePartition; } + public int getMtreeSnapshotInterval() { + return mtreeSnapshotInterval; + } + + public void setMtreeSnapshotInterval(int mtreeSnapshotInterval) { + this.mtreeSnapshotInterval = mtreeSnapshotInterval; + } + public long getPartitionInterval() { return partitionInterval; } @@ -1211,7 +1223,8 @@ public class IoTDBConfig { return allocateMemoryForTimeSeriesMetaDataCache; } - public void setAllocateMemoryForTimeSeriesMetaDataCache(long allocateMemoryForTimeSeriesMetaDataCache) { + public void setAllocateMemoryForTimeSeriesMetaDataCache( + long allocateMemoryForTimeSeriesMetaDataCache) { this.allocateMemoryForTimeSeriesMetaDataCache = allocateMemoryForTimeSeriesMetaDataCache; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 333b2d8..0ef9293 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -315,6 +315,9 @@ public class IoTDBDescriptor { Boolean.parseBoolean(properties.getProperty("enable_partial_insert", String.valueOf(conf.isEnablePartialInsert())))); + conf.setMtreeSnapshotInterval(Integer.parseInt(properties.getProperty( + "mtree_snapshot_interval", Integer.toString(conf.getMtreeSnapshotInterval())))); + conf.setEnablePerformanceStat(Boolean .parseBoolean(properties.getProperty("enable_performance_stat", Boolean.toString(conf.isEnablePerformanceStat())).trim())); @@ -428,7 +431,6 @@ public class IoTDBDescriptor { //if using org.apache.iotdb.db.auth.authorizer.OpenIdAuthorizer, openID_url is needed. conf.setOpenIdProviderUrl(properties.getProperty("openID_url", "")); - // At the same time, set TSFileConfig TSFileDescriptor.getInstance().getConfig() .setTSFileStorageFs(FSType.valueOf( diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java index 72ee54b..4a9e834 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java @@ -35,6 +35,7 @@ public class MLogWriter { private static final Logger logger = LoggerFactory.getLogger(MLogWriter.class); private BufferedWriter writer; + private int lineNumber; public MLogWriter(String schemaDir, String logFileName) throws IOException { File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir); @@ -47,21 +48,18 @@ public class MLogWriter { } File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName); - - FileWriter fileWriter; - fileWriter = new FileWriter(logFile, true); + FileWriter fileWriter = new FileWriter(logFile, true); writer = new BufferedWriter(fileWriter); } - public void close() throws IOException { writer.close(); } - public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException { + public int createTimeseries(CreateTimeSeriesPlan plan, long offset) throws IOException { writer.write(String.format("%s,%s,%s,%s,%s", MetadataOperationType.CREATE_TIMESERIES, - plan.getPath().getFullPath(), plan.getDataType().serialize(), plan.getEncoding().serialize(), - plan.getCompressor().serialize())); + plan.getPath().getFullPath(), plan.getDataType().serialize(), + plan.getEncoding().serialize(), plan.getCompressor().serialize())); writer.write(","); if (plan.getProps() != null) { @@ -86,44 +84,37 @@ public class MLogWriter { writer.write(String.valueOf(offset)); } - writer.newLine(); - writer.flush(); + return newLine(); } - public void deleteTimeseries(String path) throws IOException { + public int deleteTimeseries(String path) throws IOException { writer.write(MetadataOperationType.DELETE_TIMESERIES + "," + path); - writer.newLine(); - writer.flush(); + return newLine(); } - public void setStorageGroup(String storageGroup) throws IOException { + public int setStorageGroup(String storageGroup) throws IOException { writer.write(MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup); - writer.newLine(); - writer.flush(); + return newLine(); } - public void deleteStorageGroup(String storageGroup) throws IOException { + public int deleteStorageGroup(String storageGroup) throws IOException { writer.write(MetadataOperationType.DELETE_STORAGE_GROUP + "," + storageGroup); - writer.newLine(); - writer.flush(); + return newLine(); } - public void setTTL(String storageGroup, long ttl) throws IOException { + public int setTTL(String storageGroup, long ttl) throws IOException { writer.write(String.format("%s,%s,%s", MetadataOperationType.SET_TTL, storageGroup, ttl)); - writer.newLine(); - writer.flush(); + return newLine(); } - public void changeOffset(String path, long offset) throws IOException { + public int changeOffset(String path, long offset) throws IOException { writer.write(String.format("%s,%s,%s", MetadataOperationType.CHANGE_OFFSET, path, offset)); - writer.newLine(); - writer.flush(); + return newLine(); } - public void changeAlias(String path, String alias) throws IOException { + public int changeAlias(String path, String alias) throws IOException { writer.write(String.format("%s,%s,%s", MetadataOperationType.CHANGE_ALIAS, path, alias)); - writer.newLine(); - writer.flush(); + return newLine(); } public static void upgradeMLog(String schemaDir, String logFileName) throws IOException { @@ -158,7 +149,6 @@ public class MLogWriter { writer.write(buf.toString()); writer.newLine(); writer.flush(); - } } @@ -166,9 +156,16 @@ public class MLogWriter { if (!logFile.delete()) { throw new IOException("Deleting " + logFile + "failed."); } - + // rename tmpLogFile to mlog FSFactoryProducer.getFSFactory().moveFile(tmpLogFile, logFile); } - + + private int newLine() throws IOException { + writer.newLine(); + writer.flush(); + + // Every MTREE_SNAPSHOT_INTERVAL lines, create a checkpoint and save the MTree as a snapshot + return lineNumber++; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 23b292b..ba4e053 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -83,11 +83,13 @@ public class MManager { private static final Logger logger = LoggerFactory.getLogger(MManager.class); private static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n"; + private final int MTREE_SNAPSHOT_INTERVAL; // the lock for read/insert private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // the log file seriesPath private String logFilePath; + private String mtreeSnapshotPath; private MTree mtree; private MLogWriter logWriter; private TagLogFile tagLogFile; @@ -117,6 +119,7 @@ public class MManager { private MManager() { config = IoTDBDescriptor.getInstance().getConfig(); + MTREE_SNAPSHOT_INTERVAL = config.getMtreeSnapshotInterval(); String schemaDir = config.getSchemaDir(); File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir); if (!schemaFolder.exists()) { @@ -127,6 +130,7 @@ public class MManager { } } logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG; + mtreeSnapshotPath = schemaDir + File.separator + MetadataConstant.MTREE_SNAPSHOT; // do not write log when recover isRecovering = true; @@ -198,12 +202,20 @@ public class MManager { } private void initFromLog(File logFile) throws IOException { + long start = System.nanoTime(); // init the metadata from the operation log - mtree = new MTree(); + mtree = MTree.deserializeFrom(mtreeSnapshotPath); +// mtree = new MTree(); + logger.warn("deserialize: {}", (System.nanoTime() - start) / 1000000); if (logFile.exists()) { try (FileReader fr = new FileReader(logFile); BufferedReader br = new BufferedReader(fr)) { String cmd; + int idx = 0; + while (idx <= mtree.getSnapshotlineNumber()) { + br.readLine(); + idx++; + } while ((cmd = br.readLine()) != null) { try { operation(cmd); @@ -213,6 +225,7 @@ public class MManager { } } } + logger.warn("init from mlog: {}", (System.nanoTime() - start) / 1000000); } /** @@ -358,7 +371,10 @@ public class MManager { || (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) { offset = tagLogFile.write(plan.getTags(), plan.getAttributes()); } - logWriter.createTimeseries(plan, offset); + int logLineNumber = logWriter.createTimeseries(plan, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } leafMNode.setOffset(offset); @@ -372,9 +388,9 @@ public class MManager { /** * Add one timeseries to metadata tree, if the timeseries already exists, throw exception * - * @param path the timeseries path - * @param dataType the dateType {@code DataType} of the timeseries - * @param encoding the encoding function {@code Encoding} of the timeseries + * @param path the timeseries path + * @param dataType the dateType {@code DataType} of the timeseries + * @param encoding the encoding function {@code Encoding} of the timeseries * @param compressor the compressor function {@code Compressor} of the time series * @return whether the measurement occurs for the first time in this storage group (if true, the * measurement should be registered to the StorageEngine too) @@ -425,7 +441,10 @@ public class MManager { if (emptyStorageGroup != null) { StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(emptyStorageGroup); } - logWriter.deleteTimeseries(p); + int logLineNumber = logWriter.deleteTimeseries(p); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } catch (DeleteFailedException e) { failedNames.add(e.getName()); @@ -470,7 +489,8 @@ public class MManager { logger.debug(String.format( "Delete: TimeSeries %s's tag info has been removed from tag inverted index before " + "deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b", - node.getFullPath(), entry.getKey(), entry.getValue(), node.getOffset(), tagIndex.containsKey(entry.getKey()))); + node.getFullPath(), entry.getKey(), entry.getValue(), node.getOffset(), + tagIndex.containsKey(entry.getKey()))); } } } @@ -528,7 +548,10 @@ public class MManager { seriesNumberInStorageGroups.put(storageGroup, 0); } if (!isRecovering) { - logWriter.setStorageGroup(storageGroup); + int logLineNumber = logWriter.setStorageGroup(storageGroup); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } catch (IOException e) { throw new MetadataException(e.getMessage()); @@ -574,7 +597,10 @@ public class MManager { } // if success if (!isRecovering) { - logWriter.deleteStorageGroup(storageGroup); + int logLineNumber = logWriter.deleteStorageGroup(storageGroup); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } } catch (ConfigAdjusterException e) { @@ -651,7 +677,7 @@ public class MManager { * Get all devices under given prefixPath. * * @param prefixPath a prefix of a full path. if the wildcard is not at the tail, then each - * wildcard can only match one level, otherwise it can match to the tail. + * wildcard can only match one level, otherwise it can match to the tail. * @return A HashSet instance which stores devices names with given prefixPath. */ public Set<String> getDevices(String prefixPath) throws MetadataException { @@ -667,9 +693,9 @@ public class MManager { * Get all nodes from the given level * * @param prefixPath can be a prefix of a full path. Can not be a full path. can not have - * wildcard. But, the level of the prefixPath can be smaller than the given - * level, e.g., prefixPath = root.a while the given level is 5 - * @param nodeLevel the level can not be smaller than the level of the prefixPath + * wildcard. But, the level of the prefixPath can be smaller than the given level, e.g., + * prefixPath = root.a while the given level is 5 + * @param nodeLevel the level can not be smaller than the level of the prefixPath * @return A List instance which stores all node at given level */ public List<String> getNodesList(String prefixPath, int nodeLevel) throws MetadataException { @@ -726,7 +752,7 @@ public class MManager { * expression in this method is formed by the amalgamation of seriesPath and the character '*'. * * @param prefixPath can be a prefix or a full path. if the wildcard is not at the tail, then each - * wildcard can only match one level, otherwise it can match to the tail. + * wildcard can only match one level, otherwise it can match to the tail. */ public List<String> getAllTimeseriesName(String prefixPath) throws MetadataException { lock.readLock().lock(); @@ -766,7 +792,7 @@ public class MManager { * To calculate the count of nodes in the given level for given prefix path. * * @param prefixPath a prefix path or a full path, can not contain '*' - * @param level the level can not be smaller than the level of the prefixPath + * @param level the level can not be smaller than the level of the prefixPath */ public int getNodesCountInGivenLevel(String prefixPath, int level) throws MetadataException { lock.readLock().lock(); @@ -917,7 +943,8 @@ public class MManager { throws MetadataException { lock.readLock().lock(); try { - MNode leaf = mtree.getNodeByPath(device).getChild(measurement); + MNode node = mtree.getNodeByPath(device); + MNode leaf = node.getChild(measurement); if (leaf != null) { return ((MeasurementMNode) leaf).getSchema(); } else { @@ -1004,8 +1031,7 @@ public class MManager { } /** - * get device node, if the storage group is not set, create it when autoCreateSchema is true - * <p> + * get device node, if the storage group is not set, create it when autoCreateSchema is true <p> * (we develop this method as we need to get the node's lock after we get the lock.writeLock()) * * <p>!!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. @@ -1068,7 +1094,7 @@ public class MManager { public MNode getDeviceNode(String path) throws MetadataException { lock.readLock().lock(); - MNode node = null; + MNode node; try { node = mNodeCache.get(path); return node; @@ -1080,10 +1106,10 @@ public class MManager { } /** - * To reduce the String number in memory, - * use the deviceId from MManager instead of the deviceId read from disk - * - * @param deviceId read from disk + * To reduce the String number in memory, use the deviceId from MManager instead of the deviceId + * read from disk + * + * @param path read from disk * @return deviceId */ public String getDeviceId(String path) { @@ -1132,7 +1158,10 @@ public class MManager { try { getStorageGroupNode(storageGroup).setDataTTL(dataTTL); if (!isRecovering) { - logWriter.setTTL(storageGroup, dataTTL); + int logLineNumber = logWriter.setTTL(storageGroup, dataTTL); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } } finally { lock.writeLock().unlock(); @@ -1162,7 +1191,7 @@ public class MManager { * Check whether the given path contains a storage group change or set the new offset of a * timeseries * - * @param path timeseries + * @param path timeseries * @param offset offset in the tag file */ public void changeOffset(String path, long offset) throws MetadataException { @@ -1192,10 +1221,10 @@ public class MManager { * upsert tags and attributes key-value for the timeseries if the key has existed, just use the * new value to update it. * - * @param alias newly added alias - * @param tagsMap newly added tags map + * @param alias newly added alias + * @param tagsMap newly added tags map * @param attributesMap newly added attributes map - * @param fullPath timeseries + * @param fullPath timeseries */ public void upsertTagsAndAttributes(String alias, Map<String, String> tagsMap, Map<String, String> attributesMap, String fullPath) throws MetadataException, IOException { @@ -1218,7 +1247,10 @@ public class MManager { leafMNode.getParent().addAlias(alias, leafMNode); leafMNode.setAlias(alias); // persist to WAL - logWriter.changeAlias(fullPath, alias); + int logLineNumber = logWriter.changeAlias(fullPath, alias); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } } if (tagsMap == null && attributesMap == null) { @@ -1227,7 +1259,10 @@ public class MManager { // no tag or attribute, we need to add a new record in log if (leafMNode.getOffset() < 0) { long offset = tagLogFile.write(tagsMap, attributesMap); - logWriter.changeOffset(fullPath, offset); + int logLineNumber = logWriter.changeOffset(fullPath, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } leafMNode.setOffset(offset); // update inverted Index map if (tagsMap != null) { @@ -1269,7 +1304,8 @@ public class MManager { logger.debug(String.format( "Upsert: TimeSeries %s's tag info has been removed from tag inverted index " + "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b", - leafMNode.getFullPath(), key, beforeValue, leafMNode.getOffset(), tagIndex.containsKey(key))); + leafMNode.getFullPath(), key, beforeValue, leafMNode.getOffset(), + tagIndex.containsKey(key))); } } } @@ -1297,7 +1333,7 @@ public class MManager { * add new attributes key-value for the timeseries * * @param attributesMap newly added attributes map - * @param fullPath timeseries + * @param fullPath timeseries */ public void addAttributes(Map<String, String> attributesMap, String fullPath) throws MetadataException, IOException { @@ -1311,7 +1347,10 @@ public class MManager { // no tag or attribute, we need to add a new record in log if (leafMNode.getOffset() < 0) { long offset = tagLogFile.write(Collections.emptyMap(), attributesMap); - logWriter.changeOffset(fullPath, offset); + int logLineNumber = logWriter.changeOffset(fullPath, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } leafMNode.setOffset(offset); return; } @@ -1339,7 +1378,7 @@ public class MManager { /** * add new tags key-value for the timeseries * - * @param tagsMap newly added tags map + * @param tagsMap newly added tags map * @param fullPath timeseries */ public void addTags(Map<String, String> tagsMap, String fullPath) @@ -1354,7 +1393,10 @@ public class MManager { // no tag or attribute, we need to add a new record in log if (leafMNode.getOffset() < 0) { long offset = tagLogFile.write(tagsMap, Collections.emptyMap()); - logWriter.changeOffset(fullPath, offset); + int logLineNumber = logWriter.changeOffset(fullPath, offset); + if (logLineNumber % MTREE_SNAPSHOT_INTERVAL == 0) { + mtree.serializeTo(mtreeSnapshotPath, logLineNumber); + } leafMNode.setOffset(offset); // update inverted Index map for (Entry<String, String> entry : tagsMap.entrySet()) { @@ -1392,7 +1434,7 @@ public class MManager { /** * drop tags or attributes of the timeseries * - * @param keySet tags key or attributes key + * @param keySet tags key or attributes key * @param fullPath timeseries path */ public void dropTagsOrAttributes(Set<String> keySet, String fullPath) @@ -1449,7 +1491,8 @@ public class MManager { logger.debug(String.format( "Drop: TimeSeries %s's tag info has been removed from tag inverted index " + "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b", - leafMNode.getFullPath(), key, value, leafMNode.getOffset(), tagIndex.containsKey(key))); + leafMNode.getFullPath(), key, value, leafMNode.getOffset(), + tagIndex.containsKey(key))); } } @@ -1525,7 +1568,8 @@ public class MManager { logger.debug(String.format( "Set: TimeSeries %s's tag info has been removed from tag inverted index " + "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b", - leafMNode.getFullPath(), key, beforeValue, leafMNode.getOffset(), tagIndex.containsKey(key))); + leafMNode.getFullPath(), key, beforeValue, leafMNode.getOffset(), + tagIndex.containsKey(key))); } } tagIndex.computeIfAbsent(key, k -> new HashMap<>()) @@ -1539,8 +1583,8 @@ public class MManager { /** * rename the tag or attribute's key of the timeseries * - * @param oldKey old key of tag or attribute - * @param newKey new key of tag or attribute + * @param oldKey old key of tag or attribute + * @param newKey new key of tag or attribute * @param fullPath timeseries */ public void renameTagOrAttributeKey(String oldKey, String newKey, String fullPath) @@ -1590,7 +1634,8 @@ public class MManager { logger.debug(String.format( "Rename: TimeSeries %s's tag info has been removed from tag inverted index " + "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b", - leafMNode.getFullPath(), oldKey, value, leafMNode.getOffset(), tagIndex.containsKey(oldKey))); + leafMNode.getFullPath(), oldKey, value, leafMNode.getOffset(), + tagIndex.containsKey(oldKey))); } } tagIndex.computeIfAbsent(newKey, k -> new HashMap<>()) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 91cbf8a..0940779 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -26,6 +26,12 @@ import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLast import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayDeque; import java.util.ArrayList; @@ -40,11 +46,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; import java.util.TreeSet; import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -72,17 +81,24 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public class MTree implements Serializable { private static final long serialVersionUID = -4200394435237291964L; + private MNode root; + private int snapshotlineNumber; - private transient ThreadLocal<Integer> limit = new ThreadLocal<>(); - private transient ThreadLocal<Integer> offset = new ThreadLocal<>(); - private transient ThreadLocal<Integer> count = new ThreadLocal<>(); - private transient ThreadLocal<Integer> curOffset = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> limit = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> offset = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> count = new ThreadLocal<>(); + private transient static ThreadLocal<Integer> curOffset = new ThreadLocal<>(); MTree() { this.root = new MNode(null, IoTDBConstant.PATH_ROOT); } + public MTree(MNode root, int snapshotlineNumber) { + this.root = root; + this.snapshotlineNumber = snapshotlineNumber; + } + /** * Create a timeseries with a full path from root to leaf node Before creating a timeseries, the * storage group should be set first, throw exception otherwise @@ -897,6 +913,70 @@ public class MTree implements Serializable { } } + public int getSnapshotlineNumber() { + return snapshotlineNumber; + } + + public void serializeTo(String snapshotPath, int lineNumber) throws IOException { + BufferedWriter bw = new BufferedWriter( + new FileWriter(SystemFileFactory.INSTANCE.getFile(snapshotPath))); + bw.write(String.valueOf(lineNumber)); + bw.newLine(); + root.serializeTo(bw); + bw.close(); + } + + public static MTree deserializeFrom(String mtreeSnapshotPath) throws IOException { + File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath); + if (!mtreeSnapshot.exists()) { + return new MTree(); + } + BufferedReader br = new BufferedReader(new FileReader(mtreeSnapshot)); + int snapshotLineNumber = Integer.valueOf(br.readLine()); + String s; + Deque<MNode> nodeStack = new ArrayDeque<>(); + MNode node = null; + + while ((s = br.readLine()) != null) { + String[] nodeInfo = s.split(","); + short nodeType = Short.valueOf(nodeInfo[0]); + if (nodeType == MetadataConstant.STORAGE_GROUP_MNODE_TYPE) { + node = StorageGroupMNode.deserializeFrom(nodeInfo); + } else if (nodeType == MetadataConstant.MEASUREMENT_MNODE_TYPE) { + node = MeasurementMNode.deserializeFrom(nodeInfo); + } else { + node = new MNode(null, nodeInfo[1]); + } + + int childrenSize = Integer.valueOf(nodeInfo[nodeInfo.length - 1]); + if (childrenSize == 0) { + nodeStack.push(node); + } else { + Map<String, MNode> childrenMap = new TreeMap<>(); + for (int i = 0; i < childrenSize; i++) { + MNode child = nodeStack.removeFirst(); + child.setParent(node); + childrenMap.put(child.getName(), child); + if (child instanceof MeasurementMNode) { + String alias = ((MeasurementMNode) child).getAlias(); + if (alias != null) { + node.addAlias(alias, child); + } + } + } + node.setChildren(childrenMap); + nodeStack.push(node); + } + } + br.close(); + + limit = new ThreadLocal<>(); + offset = new ThreadLocal<>(); + count = new ThreadLocal<>(); + curOffset = new ThreadLocal<>(); + return new MTree(node, snapshotLineNumber); + } + @Override public String toString() { JSONObject jsonObject = new JSONObject(); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java index a34df03..002540d 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java @@ -85,7 +85,7 @@ public class MNode implements Serializable { } /** - * If delete a leafMNode, lock its parent, if delete an InternalNode, lock itself + * delete a child */ public void deleteChild(String name) throws DeleteFailedException { if (children != null && children.containsKey(name)) {
