This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch jira_759 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit bdc587bb23b030ee79b674918c48cb687397f6ca Author: samperson1997 <[email protected]> AuthorDate: Thu Jun 11 09:55:13 2020 +0800 [IOTDB-759] Refactor MNode by removing InternalMNode --- .../iotdb/db/engine/merge/task/MergeTask.java | 21 ++- .../engine/storagegroup/StorageGroupProcessor.java | 13 +- .../org/apache/iotdb/db/metadata/MManager.java | 25 +-- .../java/org/apache/iotdb/db/metadata/MTree.java | 121 +++++++-------- .../apache/iotdb/db/metadata/MetadataConstant.java | 11 +- .../iotdb/db/metadata/mnode/InternalMNode.java | 135 ----------------- .../org/apache/iotdb/db/metadata/mnode/MNode.java | 167 +++++++++++++++++++-- .../iotdb/db/metadata/mnode/MeasurementMNode.java | 74 ++++++++- .../iotdb/db/metadata/mnode/StorageGroupMNode.java | 44 +++++- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 44 ++++-- .../iotdb/db/metadata/MManagerImproveTest.java | 5 +- .../iotdb/tsfile/utils/ReadWriteIOUtils.java | 8 +- 12 files changed, 394 insertions(+), 274 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java index b6ac065..045fdf2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java @@ -35,9 +35,8 @@ import org.apache.iotdb.db.engine.merge.recover.MergeLogger; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; -import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.MNode; +import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.utils.MergeUtils; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -46,10 +45,9 @@ import org.slf4j.LoggerFactory; /** * MergeTask merges given seqFiles and unseqFiles into new ones, which basically consists of three - * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files - * 2. move the merged chunks in the temp files back to the seqFiles or move the unmerged - * chunks in the seqFiles into temp files and replace the seqFiles with the temp files. - * 3. remove unseqFiles + * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files 2. move the + * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles + * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles */ public class MergeTask implements Callable<Void> { @@ -92,14 +90,15 @@ public class MergeTask implements Callable<Void> { @Override public Void call() throws Exception { - try { + try { doMerge(); } catch (Exception e) { logger.error("Runtime exception in merge {}", taskName, e); cleanUp(false); // call the callback to make sure the StorageGroup exit merging status, but passing 2 // empty file lists to avoid files being deleted. - callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME)); + callback.call(Collections.emptyList(), Collections.emptyList(), + new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME)); throw e; } return null; @@ -121,7 +120,7 @@ public class MergeTask implements Callable<Void> { Map<Path, MeasurementSchema> measurementSchemaMap = new HashMap<>(); List<Path> unmergedSeries = new ArrayList<>(); for (String device : devices) { - InternalMNode deviceNode = (InternalMNode) MManager.getInstance().getNodeByPath(device); + MNode deviceNode = MManager.getInstance().getNodeByPath(device); for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) { Path path = new Path(device, entry.getKey()); measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema()); @@ -132,8 +131,8 @@ public class MergeTask implements Callable<Void> { mergeLogger.logMergeStart(); - MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource, - fullMerge, unmergedSeries, concurrentMergeSeriesNum); + MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, taskName, + mergeLogger, resource, fullMerge, unmergedSeries, concurrentMergeSeriesNum); mergeChunkTask.mergeSeries(); MergeFileTask mergeFileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 8ba9020..df4cb38 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -72,7 +72,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; @@ -774,7 +773,7 @@ public class StorageGroupProcessor { throw new WriteProcessException(e); } finally { if (node != null) { - ((InternalMNode) node).readUnlock(); + node.readUnlock(); } } } @@ -832,7 +831,7 @@ public class StorageGroupProcessor { throw new WriteProcessException(e); } finally { if (node != null) { - ((InternalMNode) node).readUnlock(); + node.readUnlock(); } } } @@ -1247,7 +1246,7 @@ public class StorageGroupProcessor { .query(deviceId, measurementId, schema.getType(), schema.getEncodingType(), schema.getProps(), context); - tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), + tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), tsFileResource.getDeviceToIndexMap(), tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), pair.left, pair.right)); @@ -1509,8 +1508,8 @@ public class StorageGroupProcessor { List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources(); for (TsFileResource resource : upgradedResources) { long partitionId = resource.getTimePartition(); - resource.getDeviceToIndexMap().forEach((device, index) -> - updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, + resource.getDeviceToIndexMap().forEach((device, index) -> + updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, resource.getEndTime(index)) ); } @@ -1525,7 +1524,7 @@ public class StorageGroupProcessor { } mergeLock.writeLock().unlock(); insertLock.writeLock().unlock(); - + // after upgrade complete, update partitionLatestFlushedTimeForEachDevice if (countUpgradeFiles() == 0) { for (Entry<Long, Map<String, Long>> entry : newlyFlushedPartitionLatestFlushedTimeForEachDevice diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index ec46b77..d803d51 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -52,9 +52,8 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; -import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.MNode; +import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.monitor.MonitorConstants; import org.apache.iotdb.db.qp.constant.SQLConstant; @@ -441,9 +440,6 @@ public class MManager { /** * remove the node from the tag inverted index - * - * @param node - * @throws IOException */ private void removeFromTagInvertedIndex(MeasurementMNode node) throws IOException { if (node.getOffset() < 0) { @@ -633,7 +629,8 @@ public class MManager { if (!deviceNode.hasChild(measurements[i])) { throw new MetadataException(measurements[i] + " does not exist in " + deviceId); } - measurementSchemas[i] = ((MeasurementMNode) deviceNode.getChild(measurements[i])).getSchema(); + measurementSchemas[i] = ((MeasurementMNode) deviceNode.getChild(measurements[i])) + .getSchema(); } return measurementSchemas; } finally { @@ -782,7 +779,8 @@ public class MManager { if (value2Node.isEmpty()) { throw new MetadataException("The key " + plan.getKey() + " is not a tag."); } - Set<MeasurementMNode> allMatchedNodes = new TreeSet<>(Comparator.comparing(MNode::getFullPath)); + Set<MeasurementMNode> allMatchedNodes = new TreeSet<>( + Comparator.comparing(MNode::getFullPath)); if (plan.isContains()) { for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) { String tagValue = entry.getKey(); @@ -893,8 +891,7 @@ public class MManager { throws MetadataException { lock.readLock().lock(); try { - InternalMNode node = (InternalMNode) mtree.getNodeByPath(device); - MNode leaf = node.getChild(measurement); + MNode leaf = mtree.getNodeByPath(device).getChild(measurement); if (leaf != null) { return ((MeasurementMNode) leaf).getSchema(); } else { @@ -1003,7 +1000,7 @@ public class MManager { } } finally { if (node != null) { - ((InternalMNode) node).readLock(); + node.readLock(); } lock.readLock().unlock(); } @@ -1029,7 +1026,7 @@ public class MManager { return node; } finally { if (node != null) { - ((InternalMNode) node).readLock(); + node.readLock(); } lock.writeLock().unlock(); } @@ -1535,9 +1532,6 @@ public class MManager { /** * Collect the timeseries schemas under "startingPath". Notice the measurements in the collected * MeasurementSchemas are the full path here. - * - * @param startingPath - * @param timeseriesSchemas */ public void collectSeries(String startingPath, List<MeasurementSchema> timeseriesSchemas) { MNode mNode; @@ -1585,9 +1579,6 @@ public class MManager { /** * if the path is in local mtree, nothing needed to do (because mtree is in the memory); Otherwise * cache the path to mRemoteSchemaCache - * - * @param path - * @param schema */ public void cacheSchema(String path, MeasurementSchema schema) { // check schema is in local diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index bfb2398..0f9975f 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -18,9 +18,26 @@ */ package org.apache.iotdb.db.metadata; +import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR; +import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD; + import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.Serializable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Pattern; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException; @@ -30,9 +47,8 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; -import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.MNode; +import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -43,24 +59,6 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import java.io.Serializable; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.TreeSet; -import java.util.regex.Pattern; - -import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR; -import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD; - /** * The hierarchical struct of the Metadata Tree is implemented in this class. */ @@ -75,19 +73,19 @@ public class MTree implements Serializable { private transient ThreadLocal<Integer> curOffset = new ThreadLocal<>(); MTree() { - this.root = new InternalMNode(null, IoTDBConstant.PATH_ROOT); + this.root = new MNode(null, IoTDBConstant.PATH_ROOT); } /** * Create a timeseries with a full path from root to leaf node Before creating a timeseries, the * storage group should be set first, throw exception otherwise * - * @param path timeseries path - * @param dataType data type - * @param encoding encoding + * @param path timeseries path + * @param dataType data type + * @param encoding encoding * @param compressor compressor - * @param props props - * @param alias alias of measurement + * @param props props + * @param alias alias of measurement */ MeasurementMNode createTimeseries( String path, @@ -113,7 +111,7 @@ public class MTree implements Serializable { if (!hasSetStorageGroup) { throw new StorageGroupNotSetException("Storage group should be created first"); } - cur.addChild(nodeName, new InternalMNode(cur, nodeName)); + cur.addChild(nodeName, new MNode(cur, nodeName)); } cur = cur.getChild(nodeName); } @@ -124,7 +122,8 @@ public class MTree implements Serializable { if (alias != null && cur.hasChild(alias)) { throw new AliasAlreadyExistException(path, alias); } - MeasurementMNode leaf = new MeasurementMNode(cur, leafName, alias, dataType, encoding, compressor, props); + MeasurementMNode leaf = new MeasurementMNode(cur, leafName, alias, dataType, encoding, + compressor, props); cur.addChild(leafName, leaf); // link alias to LeafMNode if (alias != null) { @@ -150,7 +149,7 @@ public class MTree implements Serializable { cur.addChild(nodeNames[i], new StorageGroupMNode(cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL())); } else { - cur.addChild(nodeNames[i], new InternalMNode(cur, nodeNames[i])); + cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i])); } } cur = cur.getChild(nodeNames[i]); @@ -196,7 +195,7 @@ public class MTree implements Serializable { while (i < nodeNames.length - 1) { MNode temp = cur.getChild(nodeNames[i]); if (temp == null) { - cur.addChild(nodeNames[i], new InternalMNode(cur, nodeNames[i])); + cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i])); } else if (temp instanceof StorageGroupMNode) { // before set storage group, check whether the exists or not throw new StorageGroupAlreadySetException(temp.getFullPath()); @@ -468,7 +467,7 @@ public class MTree implements Serializable { MNode current = nodeStack.pop(); if (current instanceof StorageGroupMNode) { ret.add((StorageGroupMNode) current); - } else if (current instanceof InternalMNode) { + } else if (current instanceof MNode) { nodeStack.addAll(current.getChildren().values()); } } @@ -601,7 +600,7 @@ public class MTree implements Serializable { if (child instanceof MeasurementMNode) { cnt++; } - cnt += getCount(child, nodes, idx + 1); + cnt += getCount(child, nodes, idx + 1); } return cnt; } @@ -609,6 +608,7 @@ public class MTree implements Serializable { /** * Traverse the MTree to get the count of timeseries in the given level. + * * @param targetLevel Record the distance to the target level, 0 means the target level. */ private int getCountInGivenLevel(MNode node, int targetLevel) { @@ -616,7 +616,7 @@ public class MTree implements Serializable { return 1; } int cnt = 0; - if (node instanceof InternalMNode) { + if (node instanceof MNode) { for (MNode child : node.getChildren().values()) { cnt += getCountInGivenLevel(child, targetLevel - 1); } @@ -672,23 +672,23 @@ public class MTree implements Serializable { if (node.getName().contains(TsFileConstant.PATH_SEPARATOR)) { nodeName = "\"" + node + "\""; } else { - nodeName = node.getName(); - } - String nodePath = parent + nodeName; - String[] tsRow = new String[7]; - tsRow[0] = nodePath; - tsRow[1] = ((MeasurementMNode) node).getAlias(); - MeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema(); - tsRow[2] = getStorageGroupName(nodePath); - tsRow[3] = measurementSchema.getType().toString(); - tsRow[4] = measurementSchema.getEncodingType().toString(); - tsRow[5] = measurementSchema.getCompressor().toString(); - tsRow[6] = String.valueOf(((MeasurementMNode) node).getOffset()); - timeseriesSchemaList.add(tsRow); - - if (hasLimit) { - count.set(count.get() + 1); - } + nodeName = node.getName(); + } + String nodePath = parent + nodeName; + String[] tsRow = new String[7]; + tsRow[0] = nodePath; + tsRow[1] = ((MeasurementMNode) node).getAlias(); + MeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema(); + tsRow[2] = getStorageGroupName(nodePath); + tsRow[3] = measurementSchema.getType().toString(); + tsRow[4] = measurementSchema.getEncodingType().toString(); + tsRow[5] = measurementSchema.getCompressor().toString(); + tsRow[6] = String.valueOf(((MeasurementMNode) node).getOffset()); + timeseriesSchemaList.add(tsRow); + + if (hasLimit) { + count.set(count.get() + 1); + } } String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes); if (!nodeReg.contains(PATH_WILDCARD)) { @@ -728,11 +728,11 @@ public class MTree implements Serializable { /** * Traverse the MTree to match all child node path in next level * - * @param node the current traversing node - * @param nodes split the prefix path with '.' - * @param idx the current index of array nodes + * @param node the current traversing node + * @param nodes split the prefix path with '.' + * @param idx the current index of array nodes * @param parent store the node string having traversed - * @param res store all matched device names + * @param res store all matched device names * @param length expected length of path */ private void findChildNodePathInNextLevel( @@ -746,7 +746,7 @@ public class MTree implements Serializable { parent + node.getName() + PATH_SEPARATOR, res, length); } } else { - if (node instanceof InternalMNode && node.getChildren().size() > 0) { + if (node instanceof MNode && node.getChildren().size() > 0) { for (MNode child : node.getChildren().values()) { if (!Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) { continue; @@ -788,11 +788,11 @@ public class MTree implements Serializable { /** * Traverse the MTree to match all devices with prefix path. * - * @param node the current traversing node - * @param nodes split the prefix path with '.' - * @param idx the current index of array nodes + * @param node the current traversing node + * @param nodes split the prefix path with '.' + * @param idx the current index of array nodes * @param parent store the node string having traversed - * @param res store all matched device names + * @param res store all matched device names */ private void findDevices(MNode node, String[] nodes, int idx, String parent, Set<String> res) { String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes); @@ -840,6 +840,7 @@ public class MTree implements Serializable { /** * Get all paths under the given level. + * * @param targetLevel Record the distance to the target level, 0 means the target level. */ private void findNodes(MNode node, String path, List<String> res, int targetLevel) { @@ -850,7 +851,7 @@ public class MTree implements Serializable { res.add(path); return; } - if (node instanceof InternalMNode) { + if (node instanceof MNode) { for (MNode child : node.getChildren().values()) { findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1); } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java index ee096bf..5aeab5b 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java @@ -19,10 +19,17 @@ package org.apache.iotdb.db.metadata; public class MetadataConstant { - private MetadataConstant(){ - //allowed to do nothing + + private MetadataConstant() { + // allowed to do nothing } + public static final String ROOT = "root"; public static final String METADATA_LOG = "mlog.txt"; public static final String TAG_LOG = "tlog.txt"; + public static final String MTREE_SNAPSHOT = "mtree.snapshot"; + + public static final short MNODE_TYPE = 0; + public static final short STORAGE_GROUP_MNODE_TYPE = 1; + public static final short MEASUREMENT_MNODE_TYPE = 2; } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java deleted file mode 100644 index fa2861f..0000000 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.metadata.mnode; - -import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.iotdb.db.exception.metadata.DeleteFailedException; - -public class InternalMNode extends MNode { - - private static final long serialVersionUID = 7999036474525817732L; - - private Map<String, MNode> children; - private Map<String, MNode> aliasChildren; - - protected ReadWriteLock lock = new ReentrantReadWriteLock(); - - public InternalMNode(MNode parent, String name) { - super(parent, name); - this.children = new LinkedHashMap<>(); - } - - @Override - public boolean hasChild(String name) { - return this.children.containsKey(name) || - (aliasChildren != null && aliasChildren.containsKey(name)); - } - - @Override - public void addChild(String name, MNode child) { - children.put(name, child); - } - - - /** - * If delete a leafMNode, lock its parent, if delete an InternalNode, lock itself - */ - @Override - public void deleteChild(String name) throws DeleteFailedException { - if (children.containsKey(name)) { - Lock writeLock; - // if its child node is leaf node, we need to acquire the write lock of the current device node - if (children.get(name) instanceof MeasurementMNode) { - writeLock = lock.writeLock(); - } else { - // otherwise, we only need to acquire the write lock of its child node. - writeLock = ((InternalMNode) children.get(name)).lock.writeLock(); - } - if (writeLock.tryLock()) { - children.remove(name); - writeLock.unlock(); - } else { - throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + name); - } - } - } - - @Override - public void deleteAliasChild(String alias) throws DeleteFailedException { - if (aliasChildren == null) { - return; - } - if (lock.writeLock().tryLock()) { - aliasChildren.remove(alias); - lock.writeLock().unlock(); - } else { - throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + alias); - } - } - - @Override - public MNode getChild(String name) { - return children.containsKey(name) ? children.get(name) - : (aliasChildren == null ? null : aliasChildren.get(name)); - } - - @Override - public int getLeafCount() { - int leafCount = 0; - for (MNode child : this.children.values()) { - leafCount += child.getLeafCount(); - } - return leafCount; - } - - @Override - public void addAlias(String alias, MNode child) { - if (aliasChildren == null) { - aliasChildren = new LinkedHashMap<>(); - } - aliasChildren.put(alias, child); - } - - @Override - public Map<String, MNode> getChildren() { - return children; - } - - public void readLock() { - InternalMNode node = this; - while (node != null) { - node.lock.readLock().lock(); - node = (InternalMNode) node.parent; - } - } - - public void readUnlock() { - InternalMNode node = this; - while (node != null) { - node.lock.readLock().unlock(); - node = (InternalMNode) node.parent; - } - } -} \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java index 181c309..44eaf9d 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java @@ -18,17 +18,29 @@ */ package org.apache.iotdb.db.metadata.mnode; -import org.apache.iotdb.db.conf.IoTDBConstant; -import org.apache.iotdb.db.exception.metadata.DeleteFailedException; +import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.exception.metadata.DeleteFailedException; +import org.apache.iotdb.db.metadata.MetadataConstant; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; /** * This class is the implementation of Metadata Node. One MNode instance represents one node in the * Metadata Tree */ -public abstract class MNode implements Serializable { +public class MNode implements Serializable { private static final long serialVersionUID = -770028375899514063L; @@ -44,6 +56,10 @@ public abstract class MNode implements Serializable { */ protected String fullPath; + private Map<String, MNode> children; + private Map<String, MNode> aliasChildren; + + protected ReadWriteLock lock = new ReentrantReadWriteLock(); /** * Constructor of MNode. @@ -51,42 +67,89 @@ public abstract class MNode implements Serializable { public MNode(MNode parent, String name) { this.parent = parent; this.name = name; + this.children = new LinkedHashMap<>(); } /** * check whether the MNode has a child with the name */ - public abstract boolean hasChild(String name); + public boolean hasChild(String name) { + return this.children.containsKey(name) || + (aliasChildren != null && aliasChildren.containsKey(name)); + } /** * node key, name or alias */ - public abstract void addChild(String name, MNode child); + public void addChild(String name, MNode child) { + children.put(name, child); + } /** - * delete a child + * If delete a leafMNode, lock its parent, if delete an InternalNode, lock itself */ - public abstract void deleteChild(String name) throws DeleteFailedException; + public void deleteChild(String name) throws DeleteFailedException { + if (children.containsKey(name)) { + Lock writeLock; + // if its child node is leaf node, we need to acquire the write lock of the current device node + if (children.get(name) instanceof MeasurementMNode) { + writeLock = lock.writeLock(); + } else { + // otherwise, we only need to acquire the write lock of its child node. + writeLock = (children.get(name)).lock.writeLock(); + } + if (writeLock.tryLock()) { + children.remove(name); + writeLock.unlock(); + } else { + throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + name); + } + } + } /** * delete the alias of a child */ - public abstract void deleteAliasChild(String alias) throws DeleteFailedException; + public void deleteAliasChild(String alias) throws DeleteFailedException { + if (aliasChildren == null) { + return; + } + if (lock.writeLock().tryLock()) { + aliasChildren.remove(alias); + lock.writeLock().unlock(); + } else { + throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + alias); + } + } /** * get the child with the name */ - public abstract MNode getChild(String name); + public MNode getChild(String name) { + return children.containsKey(name) ? children.get(name) + : (aliasChildren == null ? null : aliasChildren.get(name)); + } /** * get the count of all leaves whose ancestor is current node */ - public abstract int getLeafCount(); + public int getLeafCount() { + int leafCount = 0; + for (MNode child : this.children.values()) { + leafCount += child.getLeafCount(); + } + return leafCount; + } /** * add an alias */ - public abstract void addAlias(String alias, MNode child); + public void addAlias(String alias, MNode child) { + if (aliasChildren == null) { + aliasChildren = new LinkedHashMap<>(); + } + aliasChildren.put(alias, child); + } /** * get full path @@ -118,7 +181,9 @@ public abstract class MNode implements Serializable { return parent; } - public abstract Map<String, MNode> getChildren(); + public Map<String, MNode> getChildren() { + return children; + } public String getName() { return name; @@ -127,4 +192,82 @@ public abstract class MNode implements Serializable { public void setName(String name) { this.name = name; } + + public void setChildren(Map<String, MNode> children) { + this.children = children; + } + + public void setAliasChildren(Map<String, MNode> aliasChildren) { + this.aliasChildren = aliasChildren; + } + + public void serializeTo(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(MetadataConstant.MNODE_TYPE, outputStream); + ReadWriteIOUtils.write(name, outputStream); + serializeChildren(outputStream); + } + + void serializeChildren(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(children.size(), outputStream); + for (Entry<String, MNode> entry : children.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + entry.getValue().serializeTo(outputStream); + } + + if (aliasChildren == null) { + ReadWriteIOUtils.write(0, outputStream); + } else { + ReadWriteIOUtils.write(aliasChildren.size(), outputStream); + for (Entry<String, MNode> entry : aliasChildren.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + entry.getValue().serializeTo(outputStream); + } + } + } + + public static MNode deserializeFrom(InputStream inputStream, MNode parent) throws IOException { + short nodeType = ReadWriteIOUtils.readShort(inputStream); + MNode node; + if (nodeType == MetadataConstant.STORAGE_GROUP_MNODE_TYPE) { + return StorageGroupMNode.deserializeFrom(inputStream, parent); + } else if (nodeType == MetadataConstant.MEASUREMENT_MNODE_TYPE) { + return MeasurementMNode.deserializeFrom(inputStream, parent); + } else { + node = new MNode(parent, ReadWriteIOUtils.readString(inputStream)); + } + + int childrenSize = ReadWriteIOUtils.readInt(inputStream); + Map<String, MNode> children = new HashMap<>(); + for (int i = 0; i < childrenSize; i++) { + children.put(ReadWriteIOUtils.readString(inputStream), + MNode.deserializeFrom(inputStream, node)); + } + node.setChildren(children); + + int aliasChildrenSize = ReadWriteIOUtils.readInt(inputStream); + Map<String, MNode> aliasChildren = new HashMap<>(); + for (int i = 0; i < aliasChildrenSize; i++) { + children.put(ReadWriteIOUtils.readString(inputStream), + MNode.deserializeFrom(inputStream, node)); + } + node.setAliasChildren(aliasChildren); + + return node; + } + + public void readLock() { + MNode node = this; + while (node != null) { + node.lock.readLock().lock(); + node = node.parent; + } + } + + public void readUnlock() { + MNode node = this; + while (node != null) { + node.lock.readLock().unlock(); + node = node.parent; + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java index 6dd4aa1..02f2a7d 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java @@ -18,18 +18,23 @@ */ package org.apache.iotdb.db.metadata.mnode; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import org.apache.iotdb.db.metadata.MetadataConstant; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import java.util.Map; - /** * Represents an (Internal-)MNode which has a Measurement or Sensor attached to it. */ -public class MeasurementMNode extends InternalMNode { +public class MeasurementMNode extends MNode { private static final long serialVersionUID = -1199657856921206435L; @@ -47,12 +52,19 @@ public class MeasurementMNode extends InternalMNode { * @param alias alias of measurementName */ public MeasurementMNode(MNode parent, String measurementName, String alias, TSDataType dataType, - TSEncoding encoding, CompressionType type, Map<String, String> props) { + TSEncoding encoding, CompressionType type, Map<String, String> props) { super(parent, measurementName); this.schema = new MeasurementSchema(measurementName, dataType, encoding, type, props); this.alias = alias; } + public MeasurementMNode(MNode parent, String measurementName, MeasurementSchema schema, + String alias) { + super(parent, measurementName); + this.schema = schema; + this.alias = alias; + } + public MeasurementSchema getSchema() { return schema; } @@ -63,7 +75,9 @@ public class MeasurementMNode extends InternalMNode { public synchronized void updateCachedLast( TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { - if (timeValuePair == null || timeValuePair.getValue() == null) return; + if (timeValuePair == null || timeValuePair.getValue() == null) { + return; + } if (cachedLastValuePair == null) { // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache. @@ -73,7 +87,7 @@ public class MeasurementMNode extends InternalMNode { } } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp() || (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp() - && highPriorityUpdate)) { + && highPriorityUpdate)) { cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp()); cachedLastValuePair.setValue(timeValuePair.getValue()); } @@ -103,4 +117,52 @@ public class MeasurementMNode extends InternalMNode { public void setAlias(String alias) { this.alias = alias; } + + public void setSchema(MeasurementSchema schema) { + this.schema = schema; + } + + @Override + public void serializeTo(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(MetadataConstant.MEASUREMENT_MNODE_TYPE, outputStream); + ReadWriteIOUtils.write(name, outputStream); + + ReadWriteIOUtils.writeIsNull(alias, outputStream); + if (alias != null) { + ReadWriteIOUtils.write(alias, outputStream); + } + schema.serializeTo(outputStream); + ReadWriteIOUtils.write(offset, outputStream); + serializeChildren(outputStream); + } + + public static MeasurementMNode deserializeFrom(InputStream inputStream, MNode parent) + throws IOException { + String name = ReadWriteIOUtils.readString(inputStream); + String alias = null; + if (!ReadWriteIOUtils.readIsNull(inputStream)) { + alias = ReadWriteIOUtils.readString(inputStream); + } + MeasurementMNode node = new MeasurementMNode(parent, name, + MeasurementSchema.deserializeFrom(inputStream), alias); + node.setOffset(ReadWriteIOUtils.readLong(inputStream)); + + int childrenSize = ReadWriteIOUtils.readInt(inputStream); + Map<String, MNode> children = new HashMap<>(); + for (int i = 0; i < childrenSize; i++) { + children + .put(ReadWriteIOUtils.readString(inputStream), MNode.deserializeFrom(inputStream, node)); + } + node.setChildren(children); + + int aliasChildrenSize = ReadWriteIOUtils.readInt(inputStream); + Map<String, MNode> aliasChildren = new HashMap<>(); + for (int i = 0; i < aliasChildrenSize; i++) { + children + .put(ReadWriteIOUtils.readString(inputStream), MNode.deserializeFrom(inputStream, node)); + } + node.setAliasChildren(aliasChildren); + + return node; + } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java index 02c668f..82766c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/StorageGroupMNode.java @@ -18,7 +18,15 @@ */ package org.apache.iotdb.db.metadata.mnode; -public class StorageGroupMNode extends InternalMNode { +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import org.apache.iotdb.db.metadata.MetadataConstant; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +public class StorageGroupMNode extends MNode { private static final long serialVersionUID = 7999036474525817732L; @@ -28,7 +36,6 @@ public class StorageGroupMNode extends InternalMNode { */ private long dataTTL; - public StorageGroupMNode(MNode parent, String name, long dataTTL) { super(parent, name); this.dataTTL = dataTTL; @@ -43,4 +50,37 @@ public class StorageGroupMNode extends InternalMNode { this.dataTTL = dataTTL; } + @Override + public void serializeTo(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(MetadataConstant.STORAGE_GROUP_MNODE_TYPE, outputStream); + ReadWriteIOUtils.write(name, outputStream); + ReadWriteIOUtils.write(dataTTL, outputStream); + + serializeChildren(outputStream); + } + + public static StorageGroupMNode deserializeFrom(InputStream inputStream, MNode parent) + throws IOException { + String name = ReadWriteIOUtils.readString(inputStream); + StorageGroupMNode node = new StorageGroupMNode(parent, name, + ReadWriteIOUtils.readLong(inputStream)); + + int childrenSize = ReadWriteIOUtils.readInt(inputStream); + Map<String, MNode> children = new HashMap<>(); + for (int i = 0; i < childrenSize; i++) { + children.put(ReadWriteIOUtils.readString(inputStream), + MNode.deserializeFrom(inputStream, node)); + } + node.setChildren(children); + + int aliasChildrenSize = ReadWriteIOUtils.readInt(inputStream); + Map<String, MNode> aliasChildren = new HashMap<>(); + for (int i = 0; i < aliasChildrenSize; i++) { + children.put(ReadWriteIOUtils.readString(inputStream), + MNode.deserializeFrom(inputStream, node)); + } + node.setAliasChildren(aliasChildren); + + return node; + } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index fd53ec3..b628a0f 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -50,7 +50,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; - import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer; import org.apache.iotdb.db.auth.authorizer.IAuthorizer; @@ -76,16 +75,25 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; -import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.MNode; +import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.*; +import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; +import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.DeletePlan; +import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.QueryPlan; +import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan; @@ -94,8 +102,8 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan; import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; -import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; +import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan; import org.apache.iotdb.db.qp.physical.sys.MergePlan; import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; @@ -810,7 +818,8 @@ public class PlanExecutor implements IPlanExecutor { schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap()); - } else if (!(node.getChild(chunkMetadata.getMeasurementUid()) instanceof MeasurementMNode)) { + } else if (!(node + .getChild(chunkMetadata.getMeasurementUid()) instanceof MeasurementMNode)) { throw new QueryProcessException( String.format("Current Path is not leaf node. %s", series)); } @@ -818,7 +827,7 @@ public class PlanExecutor implements IPlanExecutor { } } finally { if (node != null) { - ((InternalMNode) node).readUnlock(); + node.readUnlock(); } } } @@ -893,7 +902,8 @@ public class PlanExecutor implements IPlanExecutor { insertPlan.setSchemasAndTransferType(schemas); StorageEngine.getInstance().insert(insertPlan); if (insertPlan.getFailedMeasurements() != null) { - throw new StorageEngineException("failed to insert points " + insertPlan.getFailedMeasurements()); + throw new StorageEngineException( + "failed to insert points " + insertPlan.getFailedMeasurements()); } } catch (StorageEngineException | MetadataException e) { throw new QueryProcessException(e); @@ -919,7 +929,8 @@ public class PlanExecutor implements IPlanExecutor { measurementList[i] = schemas[i].getMeasurementId(); } } catch (MetadataException e) { - logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i], e.getMessage()); + logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i], + e.getMessage()); if (enablePartialInsert) { insertPlan.markMeasurementInsertionFailed(i); } else { @@ -929,7 +940,7 @@ public class PlanExecutor implements IPlanExecutor { } } finally { if (node != null) { - ((InternalMNode) node).readUnlock(); + node.readUnlock(); } } return schemas; @@ -938,7 +949,8 @@ public class PlanExecutor implements IPlanExecutor { /** * @param loc index of measurement in insertPlan */ - private MeasurementSchema getSeriesSchema(MNode deviceNode, InsertPlan insertPlan, int loc) throws MetadataException { + private MeasurementSchema getSeriesSchema(MNode deviceNode, InsertPlan insertPlan, int loc) + throws MetadataException { String measurement = insertPlan.getMeasurements()[loc]; String deviceId = insertPlan.getDeviceId(); Object value = insertPlan.getValues()[loc]; @@ -959,15 +971,17 @@ public class PlanExecutor implements IPlanExecutor { Path path = new Path(deviceId, measurement); internalCreateTimeseries(path.toString(), dataType); - MeasurementMNode measurementNode = (MeasurementMNode) mManager.getChild(deviceNode, measurement); + MeasurementMNode measurementNode = (MeasurementMNode) mManager + .getChild(deviceNode, measurement); measurementSchema = measurementNode.getSchema(); - if(!isInferType) { + if (!isInferType) { checkType(insertPlan, loc, measurementNode.getSchema().getType()); } } } else if (deviceNode != null) { // device and measurement exists in MTree - MeasurementMNode measurementNode = (MeasurementMNode) MManager.getInstance().getChild(deviceNode, measurement); + MeasurementMNode measurementNode = (MeasurementMNode) MManager.getInstance() + .getChild(deviceNode, measurement); measurementSchema = measurementNode.getSchema(); } else { // device in not in MTree, try the cache @@ -1106,7 +1120,7 @@ public class PlanExecutor implements IPlanExecutor { throw new QueryProcessException(e); } finally { if (node != null) { - ((InternalMNode) node).readUnlock(); + node.readUnlock(); } } } diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java index 424ad52..5949d0e 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java @@ -26,9 +26,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; -import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mnode.MNode; +import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -142,7 +141,7 @@ public class MManagerImproveTest { } } finally { if (node != null) { - ((InternalMNode) node).readUnlock(); + node.readUnlock(); } } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java index 1173749..25c0557 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java @@ -105,15 +105,15 @@ public class ReadWriteIOUtils { /** * write if the object not equals null. Eg, object eauals null, then write false. */ - public static int writeIsNotNull(Object object, OutputStream outputStream) throws IOException { - return write(object != null, outputStream); + public static int writeIsNull(Object object, OutputStream outputStream) throws IOException { + return write(object == null, outputStream); } /** * write if the object not equals null. Eg, object eauals null, then write false. */ - public static int writeIsNotNull(Object object, ByteBuffer buffer) { - return write(object != null, buffer); + public static int writeIsNull(Object object, ByteBuffer buffer) { + return write(object == null, buffer); } /**
