This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TYQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e466ca06a9f038ca57e16bc919791e393a15085 Author: JackieTien97 <[email protected]> AuthorDate: Sun Mar 14 19:52:08 2021 +0800 need haonan --- .../org/apache/iotdb/db/engine/StorageEngine.java | 4 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 13 +++ .../apache/iotdb/db/engine/memtable/IMemTable.java | 9 ++ .../db/engine/querycontext/QueryDataSource.java | 11 +-- .../db/engine/querycontext/ReadOnlyMemChunk.java | 5 ++ .../engine/storagegroup/StorageGroupProcessor.java | 99 ++++++++++------------ .../db/engine/storagegroup/TsFileProcessor.java | 95 +++++++++++++-------- .../db/engine/storagegroup/TsFileResource.java | 40 ++++----- .../org/apache/iotdb/db/metadata/MManager.java | 65 ++++++++------ .../iotdb/db/query/context/QueryContext.java | 11 ++- .../chunk/metadata/DiskChunkMetadataLoader.java | 5 ++ .../chunk/metadata/MemChunkMetadataLoader.java | 5 ++ .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 1 - .../storagegroup/StorageGroupProcessorTest.java | 23 +++-- .../iotdb/db/engine/storagegroup/TTLTest.java | 34 ++++---- .../engine/storagegroup/TsFileProcessorTest.java | 42 ++++----- .../reader/series/SeriesAggregateReaderTest.java | 20 ++--- .../reader/series/SeriesReaderByTimestampTest.java | 1 - .../tsfile/file/metadata/TimeseriesMetadata.java | 4 + .../file/metadata/VectorTimeSeriesMetadata.java | 29 ++++--- .../read/controller/IChunkMetadataLoader.java | 2 + 21 files changed, 285 insertions(+), 233 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index b774e52..2b5cd02 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -665,10 +665,8 @@ public class StorageEngine implements IService { throws StorageEngineException, QueryProcessException { PartialPath fullPath = (PartialPath) seriesExpression.getSeriesPath(); PartialPath deviceId = fullPath.getDevicePath(); - String measurementId = seriesExpression.getSeriesPath().getMeasurement(); StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId); - return storageGroupProcessor.query( - deviceId, measurementId, context, filePathsManager, seriesExpression.getFilter()); + return storageGroupProcessor.query(fullPath, context, filePathsManager, seriesExpression.getFilter()); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 3538aa1..4d68a78 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -274,7 +274,20 @@ public abstract class AbstractMemTable implements IMemTable { measurement, dataType, encoding, chunkCopy, props, curSize, deletionList); } + + // TODO BY HAONAN HOU @Override + public ReadOnlyMemChunk query( + String deviceId, + String measurement, + IMeasurementSchema schema, + long timeLowerBound, + List<TimeRange> deletionList) + throws IOException, QueryProcessException, MetadataException { + return null; + } + + @Override public void delete( PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) { Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index b3fb16a..e9e9c33 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -94,6 +94,15 @@ public interface IMemTable { List<TimeRange> deletionList) throws IOException, QueryProcessException, MetadataException; + + ReadOnlyMemChunk query( + String deviceId, + String measurement, + IMeasurementSchema schema, + long timeLowerBound, + List<TimeRange> deletionList) + throws IOException, QueryProcessException, MetadataException; + /** putBack all the memory resources. */ void clear(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java index b3dfb98..a3fc963 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java @@ -19,16 +19,13 @@ package org.apache.iotdb.db.engine.querycontext; +import java.util.List; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; -import java.util.List; - public class QueryDataSource { - private PartialPath seriesPath; private List<TsFileResource> seqResources; private List<TsFileResource> unseqResources; @@ -36,18 +33,12 @@ public class QueryDataSource { private long dataTTL = Long.MAX_VALUE; public QueryDataSource( - PartialPath seriesPath, List<TsFileResource> seqResources, List<TsFileResource> unseqResources) { - this.seriesPath = seriesPath; this.seqResources = seqResources; this.unseqResources = unseqResources; } - public PartialPath getSeriesPath() { - return seriesPath; - } - public List<TsFileResource> getSeqResources() { return seqResources; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java index 8236e15..c635968 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java @@ -59,6 +59,7 @@ public class ReadOnlyMemChunk { private int chunkDataSize; + // TODO BY HAONAN HOU public ReadOnlyMemChunk( String measurementUid, TSDataType dataType, @@ -124,6 +125,9 @@ public class ReadOnlyMemChunk { case DOUBLE: statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); break; + case VECTOR: + statsByType.update(timeValuePair.getTimestamp()); + break; default: throw new QueryProcessException("Unsupported data type:" + dataType); } @@ -143,6 +147,7 @@ public class ReadOnlyMemChunk { return !chunkPointReader.hasNextTimeValuePair(); } + // TODO BY HAONAN HOU public ChunkMetadata getChunkMetaData() { return cachedMetaData; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 1f9d80e..e67b691 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,6 +18,38 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; - -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Deque; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; -import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one * TsFileProcessor in the working status. <br> @@ -1535,8 +1533,7 @@ public class StorageGroupProcessor { // TODO need a read lock, please consider the concurrency with flush manager threads. public QueryDataSource query( - PartialPath deviceId, - String measurementId, + PartialPath fullPath, QueryContext context, QueryFileManager filePathsManager, Filter timeFilter) @@ -1547,8 +1544,7 @@ public class StorageGroupProcessor { getFileResourceListForQuery( tsFileManagement.getTsFileList(true), upgradeSeqFileList, - deviceId, - measurementId, + fullPath, context, timeFilter, true); @@ -1556,12 +1552,11 @@ public class StorageGroupProcessor { getFileResourceListForQuery( tsFileManagement.getTsFileList(false), upgradeUnseqFileList, - deviceId, - measurementId, + fullPath, context, timeFilter, false); - QueryDataSource dataSource = new QueryDataSource(deviceId, seqResources, unseqResources); + QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources); // used files should be added before mergeLock is unlocked, or they may be deleted by // running merge // is null only in tests @@ -1592,24 +1587,24 @@ public class StorageGroupProcessor { private List<TsFileResource> getFileResourceListForQuery( Collection<TsFileResource> tsFileResources, List<TsFileResource> upgradeTsFileResources, - PartialPath deviceId, - String measurementId, + PartialPath fullPath, QueryContext context, Filter timeFilter, boolean isSeq) throws MetadataException { + String deviceId = fullPath.getDevice(); if (config.isDebugOn()) { DEBUG_LOGGER.info( "Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}", - deviceId.getFullPath(), - measurementId, + deviceId, + fullPath.getMeasurement(), tsFileResources, isSeq, (timeFilter == null ? "null" : timeFilter)); } - IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId); + IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(fullPath); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); long timeLowerBound = @@ -1618,7 +1613,7 @@ public class StorageGroupProcessor { // for upgrade files and old files must be closed for (TsFileResource tsFileResource : upgradeTsFileResources) { - if (!tsFileResource.isSatisfied(deviceId.getFullPath(), timeFilter, isSeq, dataTTL)) { + if (!tsFileResource.isSatisfied(deviceId, timeFilter, isSeq, dataTTL)) { continue; } closeQueryLock.readLock().lock(); @@ -1630,7 +1625,7 @@ public class StorageGroupProcessor { } for (TsFileResource tsFileResource : tsFileResources) { - if (!tsFileResource.isSatisfied(deviceId.getFullPath(), timeFilter, isSeq, dataTTL)) { + if (!tsFileResource.isSatisfied(fullPath.getDevice(), timeFilter, isSeq, dataTTL)) { continue; } closeQueryLock.readLock().lock(); @@ -1641,11 +1636,9 @@ public class StorageGroupProcessor { tsFileResource .getUnsealedFileProcessor() .query( - deviceId.getFullPath(), - measurementId, - schema.getType(), - schema.getEncodingType(), - schema.getProps(), + deviceId, + fullPath.getMeasurement(), + schema, context, tsfileResourcesForQuery); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 1e705ab..773e5dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -18,6 +18,16 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -57,28 +67,18 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - @SuppressWarnings("java:S1135") // ignore todos public class TsFileProcessor { @@ -91,7 +91,9 @@ public class TsFileProcessor { private StorageGroupInfo storageGroupInfo; private TsFileProcessorInfo tsFileProcessorInfo; - /** sync this object in query() and asyncTryToFlush() */ + /** + * sync this object in query() and asyncTryToFlush() + */ private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>(); private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>(); @@ -113,7 +115,9 @@ public class TsFileProcessor { private IMemTable workMemTable; - /** this callback is called before the workMemtable is added into the flushingMemTables. */ + /** + * this callback is called before the workMemtable is added into the flushingMemTables. + */ private final UpdateEndTimeCallBack updateLatestFlushTimeCallback; private WriteLogNode logNode; @@ -220,9 +224,9 @@ public class TsFileProcessor { * the range [start, end) * * @param insertTabletPlan insert a tablet of a device - * @param start start index of rows to be inserted in insertTabletPlan - * @param end end index of rows to be inserted in insertTabletPlan - * @param results result array + * @param start start index of rows to be inserted in insertTabletPlan + * @param end end index of rows to be inserted in insertTabletPlan + * @param results result array */ public void insertTablet( InsertTabletPlan insertTabletPlan, int start, int end, TSStatus[] results) @@ -637,7 +641,9 @@ public class TsFileProcessor { } } - /** put the working memtable into flushing list and set the working memtable to null */ + /** + * put the working memtable into flushing list and set the working memtable to null + */ public void asyncFlush() { flushQueryLock.writeLock().lock(); if (logger.isDebugEnabled()) { @@ -704,7 +710,9 @@ public class TsFileProcessor { FlushManager.getInstance().registerTsFileProcessor(this); } - /** put back the memtable to MemTablePool and make metadata in writer visible */ + /** + * put back the memtable to MemTablePool and make metadata in writer visible + */ private void releaseFlushedMemTable(IMemTable memTable) { flushQueryLock.writeLock().lock(); if (logger.isDebugEnabled()) { @@ -1043,18 +1051,14 @@ public class TsFileProcessor { * memtables and then compact them into one TimeValuePairSorter). Then get the related * ChunkMetadata of data on disk. * - * @param deviceId device id + * @param deviceId device id * @param measurementId measurements id - * @param dataType data type - * @param encoding encoding */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void query( String deviceId, String measurementId, - TSDataType dataType, - TSEncoding encoding, - Map<String, String> props, + IMeasurementSchema schema, QueryContext context, List<TsFileResource> tsfileResourcesForQuery) throws IOException, MetadataException { @@ -1078,9 +1082,7 @@ public class TsFileProcessor { flushingMemTable.query( deviceId, measurementId, - dataType, - encoding, - props, + schema, context.getQueryTimeLowerBound(), deletionList); if (memChunk != null) { @@ -1092,9 +1094,7 @@ public class TsFileProcessor { workMemTable.query( deviceId, measurementId, - dataType, - encoding, - props, + schema, context.getQueryTimeLowerBound(), null); if (memChunk != null) { @@ -1108,8 +1108,33 @@ public class TsFileProcessor { modificationFile, new PartialPath(deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId)); - List<ChunkMetadata> chunkMetadataList = - writer.getVisibleMetadataList(deviceId, measurementId, dataType); + List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); + if (schema instanceof VectorMeasurementSchema) { + List<ChunkMetadata> timeChunkMetadataList = + writer.getVisibleMetadataList(deviceId, measurementId, schema.getType()); + List<List<ChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); + List<String> valueMeasurementIdList = schema.getValueMeasurementIdList(); + List<TSDataType> valueDataTypeList = schema.getValueTSDataTypeList(); + for (int i = 0; i < valueMeasurementIdList.size(); i++) { + valueChunkMetadataList.add(writer + .getVisibleMetadataList(deviceId, valueMeasurementIdList.get(i), + valueDataTypeList.get(i))); + } + + + for (int i = 0; i < timeChunkMetadataList.size(); i++) { + List<IChunkMetadata> valueChunkMetadata = new ArrayList<>(); + for (List<ChunkMetadata> chunkMetadata : valueChunkMetadataList) { + valueChunkMetadata.add(chunkMetadata.get(i)); + } + chunkMetadataList + .add(new VectorChunkMetadata(timeChunkMetadataList.get(i), valueChunkMetadata)); + } + } else { + chunkMetadataList = + new ArrayList<>(writer.getVisibleMetadataList(deviceId, measurementId, schema.getType())); + } + QueryUtils.modifyChunkMetaData(chunkMetadataList, modifications); chunkMetadataList.removeIf(context::chunkNotSatisfy); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 6d4f58b..57472f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -18,6 +18,20 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.ModificationFile; @@ -32,7 +46,6 @@ import org.apache.iotdb.db.service.UpgradeSevice; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -41,25 +54,9 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Random; -import java.util.Set; - @SuppressWarnings("java:S1135") // ignore todos public class TsFileResource { @@ -103,7 +100,7 @@ public class TsFileResource { * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query * process. */ - private List<ChunkMetadata> chunkMetadataList; + private List<IChunkMetadata> chunkMetadataList; /** Mem chunk data. Only be set in a temporal TsFileResource in a query process. */ private List<ReadOnlyMemChunk> readOnlyMemChunk; @@ -184,7 +181,7 @@ public class TsFileResource { /** unsealed TsFile */ public TsFileResource( List<ReadOnlyMemChunk> readOnlyMemChunk, - List<ChunkMetadata> chunkMetadataList, + List<IChunkMetadata> chunkMetadataList, TsFileResource originTsFileResource) throws IOException { this.file = originTsFileResource.file; @@ -206,7 +203,6 @@ public class TsFileResource { } private void generateTimeSeriesMetadata() throws IOException { - timeSeriesMetadata = new TimeseriesMetadata(); timeSeriesMetadata.setOffsetOfChunkMetaDataList(-1); timeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1); @@ -223,7 +219,7 @@ public class TsFileResource { Statistics<?> seriesStatistics = Statistics.getStatsByType(timeSeriesMetadata.getTSDataType()); // flush chunkMetadataList one by one - for (ChunkMetadata chunkMetadata : chunkMetadataList) { + for (IChunkMetadata chunkMetadata : chunkMetadataList) { seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); } @@ -234,7 +230,7 @@ public class TsFileResource { } timeSeriesMetadata.setStatistics(seriesStatistics); } else { - timeSeriesMetadata = null; + this.timeSeriesMetadata = null; } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 76703a8..f6c6974 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -18,6 +18,32 @@ */ package org.apache.iotdb.db.metadata; +import static java.util.stream.Collectors.toList; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -73,37 +99,9 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static java.util.stream.Collectors.toList; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; - /** * This class takes the responsibility of serialization of all the metadata info and persistent it * into files. This class contains all the interfaces to modify the metadata for delta system. All @@ -1015,6 +1013,17 @@ public class MManager { return null; } + // TODO BY ZESONG SUN + public IMeasurementSchema getSeriesSchema(PartialPath fullPath) + throws MetadataException { + MNode node = mtree.getNodeByPath(fullPath.getDevicePath()); + MNode leaf = node.getChild(fullPath.getMeasurement()); + if (leaf != null) { + return ((MeasurementMNode) leaf).getSchema(); + } + return null; + } + /** * Get child node path in the next level of the given path. * diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java index 3febfa9..0c53d08 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java @@ -19,16 +19,15 @@ package org.apache.iotdb.db.query.context; -import org.apache.iotdb.db.engine.modification.Modification; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; /** QueryContext contains the shared information with in a query. */ public class QueryContext { @@ -95,7 +94,7 @@ public class QueryContext { this.queryTimeLowerBound = queryTimeLowerBound; } - public boolean chunkNotSatisfy(ChunkMetadata chunkMetaData) { + public boolean chunkNotSatisfy(IChunkMetadata chunkMetaData) { return chunkMetaData.getEndTime() < queryTimeLowerBound; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java index 3a6fc2a..4ac0080 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java @@ -72,6 +72,11 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { return chunkMetadataList; } + @Override + public boolean isMemChunkMetadataLoader() { + return false; + } + public static void setDiskChunkLoader( List<IChunkMetadata> chunkMetadataList, TsFileResource resource, diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java index 8a22d0b..cf022e3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java @@ -73,4 +73,9 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader { } return chunkMetadataList; } + + @Override + public boolean isMemChunkMetadataLoader() { + return true; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 050de31..0f9297a 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -157,7 +157,6 @@ public class FileLoaderUtils { */ public static List<IPageReader> loadPageReaderList( IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException { - // TODO memory Vector chunk metadata if (chunkMetaData == null) { throw new IOException("Can't init null chunkMeta"); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index e68c4af..6b64b13 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -125,9 +126,7 @@ public class StorageGroupProcessorTest { tsfileProcessor.query( deviceId, measurementId, - TSDataType.INT32, - TSEncoding.RLE, - Collections.emptyMap(), + new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.RLE, CompressionType.UNCOMPRESSED, Collections.emptyMap()), new QueryContext(), tsfileResourcesForQuery); } @@ -162,7 +161,7 @@ public class StorageGroupProcessorTest { } processor.syncCloseAllWorkingTsFileProcessors(); QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { Assert.assertTrue(resource.isClosed()); @@ -191,7 +190,7 @@ public class StorageGroupProcessorTest { processor.syncCloseAllWorkingTsFileProcessors(); QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); } @@ -252,7 +251,7 @@ public class StorageGroupProcessorTest { processor.syncCloseAllWorkingTsFileProcessors(); QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(1, queryDataSource.getUnseqResources().size()); @@ -282,7 +281,7 @@ public class StorageGroupProcessorTest { processor.syncCloseAllWorkingTsFileProcessors(); QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(10, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { @@ -322,7 +321,7 @@ public class StorageGroupProcessorTest { } QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { @@ -404,7 +403,7 @@ public class StorageGroupProcessorTest { } QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); @@ -486,7 +485,7 @@ public class StorageGroupProcessorTest { } QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); @@ -568,7 +567,7 @@ public class StorageGroupProcessorTest { } QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); @@ -606,7 +605,7 @@ public class StorageGroupProcessorTest { } QueryDataSource queryDataSource = - processor.query(new PartialPath(deviceId), measurementId, context, null, null); + processor.query(new PartialPath(deviceId, measurementId), context, null, null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { Assert.assertTrue(resource.isClosed()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index 1943ef3..a76f09c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -20,6 +20,19 @@ package org.apache.iotdb.db.engine.storagegroup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; @@ -54,25 +67,10 @@ import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class TTLTest { private String sg1 = "root.TTL_SG1"; @@ -223,7 +221,7 @@ public class TTLTest { // files before ttl QueryDataSource dataSource = storageGroupProcessor.query( - new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null); + new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null); List<TsFileResource> seqResource = dataSource.getSeqResources(); List<TsFileResource> unseqResource = dataSource.getUnseqResources(); assertEquals(4, seqResource.size()); @@ -234,7 +232,7 @@ public class TTLTest { // files after ttl dataSource = storageGroupProcessor.query( - new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null); + new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null); seqResource = dataSource.getSeqResources(); unseqResource = dataSource.getUnseqResources(); assertTrue(seqResource.size() < 4); @@ -269,7 +267,7 @@ public class TTLTest { storageGroupProcessor.setDataTTL(0); dataSource = storageGroupProcessor.query( - new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null); + new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null); seqResource = dataSource.getSeqResources(); unseqResource = dataSource.getUnseqResources(); assertEquals(0, seqResource.size()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index f972a83..276cacd 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -18,6 +18,15 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; @@ -30,30 +39,21 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; - import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - public class TsFileProcessorTest { private TsFileProcessor processor; @@ -104,7 +104,7 @@ public class TsFileProcessorTest { SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.isEmpty()); for (int i = 1; i <= 100; i++) { @@ -116,7 +116,7 @@ public class TsFileProcessorTest { // query data in memory tsfileResourcesForQuery.clear(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(); for (ReadOnlyMemChunk chunk : memChunks) { @@ -134,7 +134,7 @@ public class TsFileProcessorTest { tsfileResourcesForQuery.clear(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); assertEquals( @@ -166,7 +166,7 @@ public class TsFileProcessorTest { SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.isEmpty()); for (int i = 1; i <= 100; i++) { @@ -178,7 +178,7 @@ public class TsFileProcessorTest { // query data in memory tsfileResourcesForQuery.clear(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); int num = 1; List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(); @@ -197,7 +197,7 @@ public class TsFileProcessorTest { tsfileResourcesForQuery.clear(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); assertEquals( @@ -254,7 +254,7 @@ public class TsFileProcessorTest { SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.isEmpty()); for (int flushId = 0; flushId < 10; flushId++) { @@ -269,7 +269,7 @@ public class TsFileProcessorTest { tsfileResourcesForQuery.clear(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertFalse(tsfileResourcesForQuery.isEmpty()); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); assertEquals(10, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); @@ -302,7 +302,7 @@ public class TsFileProcessorTest { List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.isEmpty()); for (int i = 1; i <= 100; i++) { @@ -314,7 +314,7 @@ public class TsFileProcessorTest { // query data in memory tsfileResourcesForQuery.clear(); processor.query( - deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); + deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery); assertFalse(tsfileResourcesForQuery.isEmpty()); assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(); diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java index 55eea5c..a713157 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java @@ -19,6 +19,15 @@ package org.apache.iotdb.db.query.reader.series; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.metadata.IllegalPathException; @@ -33,19 +42,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.junit.Assert.*; - public class SeriesAggregateReaderTest { private static final String SERIES_READER_TEST_SG = "root.seriesReaderTest"; @@ -71,7 +71,7 @@ public class SeriesAggregateReaderTest { PartialPath path = new PartialPath(SERIES_READER_TEST_SG + ".device0.sensor0"); Set<String> allSensors = new HashSet<>(); allSensors.add("sensor0"); - QueryDataSource queryDataSource = new QueryDataSource(path, seqResources, unseqResources); + QueryDataSource queryDataSource = new QueryDataSource(seqResources, unseqResources); SeriesAggregateReader seriesReader = new SeriesAggregateReader( path, diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java index 4e71afa..129f18d 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java @@ -63,7 +63,6 @@ public class SeriesReaderByTimestampTest { public void test() throws IOException, IllegalPathException { QueryDataSource dataSource = new QueryDataSource( - new PartialPath(SERIES_READER_TEST_SG + ".device0.sensor0"), seqResources, unseqResources); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java index 10efcb7..e86fcb2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java @@ -189,6 +189,10 @@ public class TimeseriesMetadata implements Accountable, ITimeSeriesMetadata { this.chunkMetadataLoader = chunkMetadataLoader; } + public IChunkMetadataLoader getChunkMetadataLoader() { + return chunkMetadataLoader; + } + @Override public List<IChunkMetadata> loadChunkMetadataList() throws IOException { return chunkMetadataLoader.loadChunkMetadataList(this); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java index 20a25df..bfa8e74 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java @@ -62,23 +62,26 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata { @Override public List<IChunkMetadata> loadChunkMetadataList() throws IOException { - List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.loadChunkMetadataList(); - List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); - for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) { - valueChunkMetadataList.add(metadata.loadChunkMetadataList()); - } + if (timeseriesMetadata.getChunkMetadataLoader().isMemChunkMetadataLoader()) { + return timeseriesMetadata.loadChunkMetadataList(); + } else { + List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.loadChunkMetadataList(); + List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); + for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) { + valueChunkMetadataList.add(metadata.loadChunkMetadataList()); + } - List<IChunkMetadata> res = new ArrayList<>(); + List<IChunkMetadata> res = new ArrayList<>(); - for (int i = 0; i < timeChunkMetadata.size(); i++) { - List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); - for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) { - chunkMetadataList.add(chunkMetadata.get(i)); + for (int i = 0; i < timeChunkMetadata.size(); i++) { + List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); + for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) { + chunkMetadataList.add(chunkMetadata.get(i)); + } + res.add(new VectorChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList)); } - res.add(new VectorChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList)); + return res; } - - return res; } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java index 093219d..71371a4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java @@ -29,4 +29,6 @@ public interface IChunkMetadataLoader { /** read all chunk metadata of one time series in one file. */ List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) throws IOException; + + boolean isMemChunkMetadataLoader(); }
