This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch jira_631 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4339c68bbbfb0886d3661a3308b90feb83f681b1 Author: samperson1997 <[email protected]> AuthorDate: Thu Apr 30 17:10:41 2020 +0800 [IOTDB-631] Using new TsFile MetadataIndex to optimize query and cache --- .../db/engine/cache/TimeSeriesMetadataCache.java | 63 +++++++++++------ .../tsfile/file/metadata/MetadataIndexNode.java | 2 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 79 +++++++++++++++++----- .../read/controller/MetadataQuerierByFileImpl.java | 10 ++- 4 files changed, 109 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java index e586cde..1fb7c8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java @@ -19,6 +19,13 @@ package org.apache.iotdb.db.engine.cache; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -30,22 +37,17 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.BloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching - * strategy is LRU. + * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching strategy is + * LRU. */ public class TimeSeriesMetadataCache { private static final Logger logger = LoggerFactory.getLogger(TimeSeriesMetadataCache.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE = config.getAllocateMemoryForTimeSeriesMetaDataCache(); + private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE = config + .getAllocateMemoryForTimeSeriesMetaDataCache(); private static boolean cacheEnable = config.isMetaDataCacheEnable(); private final LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache; @@ -58,9 +60,11 @@ public class TimeSeriesMetadataCache { private TimeSeriesMetadataCache() { logger.info("TimeseriesMetadataCache size = " + MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE); - lruCache = new LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata>(MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE, true) { + lruCache = new LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata>( + MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE, true) { int count = 0; long averageSize = 0; + @Override protected long calEntrySize(TimeSeriesMetadataCacheKey key, TimeseriesMetadata value) { if (count < 10) { @@ -83,7 +87,8 @@ public class TimeSeriesMetadataCache { return TimeSeriesMetadataCache.TimeSeriesMetadataCacheHolder.INSTANCE; } - public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String> allSensors) throws IOException { + public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String> allSensors) + throws IOException { if (!cacheEnable) { // bloom filter part TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(key.filePath); @@ -125,9 +130,20 @@ public class TimeSeriesMetadataCache { return null; } TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); - TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadata(new Path(key.device, key.measurement)); - lruCache.put(key, timeseriesMetadata); - return timeseriesMetadata; + TimeseriesMetadata resultTimeseriesMetadata = reader + .readTimeseriesMetadata(new Path(key.device, key.measurement)); + lruCache.put(key, resultTimeseriesMetadata); + + List<TimeseriesMetadata> timeSeriesMetadataList = reader + .readTimeseriesMetadata(key.device, allSensors); + if (!allSensors.isEmpty()) { + // put TimeSeriesMetadata of all sensors used in this query into cache + timeSeriesMetadataList.forEach(timeseriesMetadata -> { + lruCache.put(new TimeSeriesMetadataCacheKey(key.filePath, key.device, + timeseriesMetadata.getMeasurementId()), timeseriesMetadata); + }); + } + return resultTimeseriesMetadata; } catch (IOException e) { logger.error("something wrong happened while reading {}", key.filePath); throw e; @@ -143,9 +159,9 @@ public class TimeSeriesMetadataCache { return; } logger.debug( - "[TimeSeriesMetadata cache {}hit] The number of requests for cache is {}, hit rate is {}.", - isHit ? "" : "didn't ", cacheRequestNum.get(), - cacheHitNum.get() * 1.0 / cacheRequestNum.get()); + "[TimeSeriesMetadata cache {}hit] The number of requests for cache is {}, hit rate is {}.", + isHit ? "" : "didn't ", cacheRequestNum.get(), + cacheHitNum.get() * 1.0 / cacheRequestNum.get()); } public double calculateTimeSeriesMetadataHitRatio() { @@ -177,6 +193,7 @@ public class TimeSeriesMetadataCache { } public static class TimeSeriesMetadataCacheKey { + private String filePath; private String device; private String measurement; @@ -189,12 +206,16 @@ public class TimeSeriesMetadataCache { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } TimeSeriesMetadataCacheKey that = (TimeSeriesMetadataCacheKey) o; return Objects.equals(filePath, that.filePath) && - Objects.equals(device, that.device) && - Objects.equals(measurement, that.measurement); + Objects.equals(device, that.device) && + Objects.equals(measurement, that.measurement); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java index 275521e..28fe5df 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java @@ -120,6 +120,6 @@ public class MetadataIndexNode { return mid; // key found } } - return low - 1; // key not found + return low == 0 ? low : low - 1; // key not found } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 1e3c8a7..b11a524 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -34,7 +33,6 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.compress.IUnCompressor; @@ -178,9 +176,8 @@ public class TsFileSequenceReader implements AutoCloseable { metadataSize.flip(); // read file metadata size and position fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize); - fileMetadataPos = - tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES - - fileMetadataSize; + fileMetadataPos = tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length + - Integer.BYTES - fileMetadataSize; } } @@ -314,13 +311,13 @@ public class TsFileSequenceReader implements AutoCloseable { public TimeseriesMetadata readTimeseriesMetadata(Path path) throws IOException { readFileMetadata(); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); - Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetaDataAndEndOffset( + Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset( deviceMetadataIndexNode, path.getDevice(), MetadataIndexNodeType.INTERNAL_DEVICE); ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); while (!metadataIndexPair.left.getChildNodeType() .equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); - metadataIndexPair = getMetaDataAndEndOffset(metadataIndexNode, + metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), MetadataIndexNodeType.INTERNAL_MEASUREMENT); } List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); @@ -328,14 +325,62 @@ public class TsFileSequenceReader implements AutoCloseable { while (buffer.hasRemaining()) { timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer)); } - String[] measurementNameList = timeseriesMetadataList.stream() - .map(TimeseriesMetadata::getMeasurementId).collect(Collectors.toList()) - .toArray(new String[timeseriesMetadataList.size()]); - // return null if path does not exist in the TsFile - int searchResult; - return (searchResult = Arrays.binarySearch(measurementNameList, path.getMeasurement())) >= 0 - ? timeseriesMetadataList.get(searchResult) : null; + int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, + path.getMeasurement()); + return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; + } + + public List<TimeseriesMetadata> readTimeseriesMetadata(String device, Set<String> measurements) + throws IOException { + readFileMetadata(); + MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); + Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset( + deviceMetadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE); + List<TimeseriesMetadata> resultTimeseriesMetadataList = new ArrayList<>(); + for (String measurement : measurements) { + ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair; + List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); + while (!measurementMetadataIndexPair.left.getChildNodeType() + .equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); + measurementMetadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, + measurement, MetadataIndexNodeType.INTERNAL_MEASUREMENT); + } + buffer = readData(measurementMetadataIndexPair.left.getOffset(), + measurementMetadataIndexPair.right); + while (buffer.hasRemaining()) { + timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer)); + } + int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, + measurement); + if (searchResult >= 0) { + resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); + } + } + return resultTimeseriesMetadataList; + } + + private int binarySearchInTimeseriesMetadataList(List<TimeseriesMetadata> timeseriesMetadataList, + String key) { + int low = 0; + int high = timeseriesMetadataList.size() - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + TimeseriesMetadata midVal = timeseriesMetadataList.get(mid); + int cmp = midVal.getMeasurementId().compareTo(key); + + if (cmp < 0) { + low = mid + 1; + } else if (cmp > 0) { + high = mid - 1; + } else { + return mid; // key found + } + } + return -1; // key not found } public List<String> getAllDevices() throws IOException { @@ -489,7 +534,7 @@ public class TsFileSequenceReader implements AutoCloseable { private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException { MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); - Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetaDataAndEndOffset( + Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset( metadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE); ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); @@ -501,14 +546,14 @@ public class TsFileSequenceReader implements AutoCloseable { return deviceTimeseriesMetadata; } - private Pair<MetadataIndexEntry, Long> getMetaDataAndEndOffset(MetadataIndexNode metadataIndex, + private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex, String name, MetadataIndexNodeType type) throws IOException { Pair<MetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(name); if (!childIndexEntry.left.getChildNodeType().equals(type)) { return childIndexEntry; } ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); - return getMetaDataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name, type); + return getMetadataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name, type); } /** diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java index bb58ec4..02582f0 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java @@ -115,13 +115,11 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier { continue; } - Map<String, TimeseriesMetadata> timeseriesMetaDataInDevice = tsFileReader - .readDeviceMetadata(selectedDevice); + List<TimeseriesMetadata> timeseriesMetaDataList = tsFileReader + .readTimeseriesMetadata(selectedDevice, selectedMeasurements); List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); - for (Map.Entry<String, TimeseriesMetadata> entry : timeseriesMetaDataInDevice.entrySet()) { - if (selectedMeasurements.contains(entry.getKey())) { - chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(entry.getValue())); - } + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetaDataList) { + chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(timeseriesMetadata)); } // d1 for (ChunkMetadata chunkMetaData : chunkMetadataList) {
