This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c22c1405ba extract interface from path (#5662)
c22c1405ba is described below
commit c22c1405ba85c94952d8d625116fadcca0cdbe2a
Author: lisijia <[email protected]>
AuthorDate: Tue Apr 26 14:51:51 2022 +0800
extract interface from path (#5662)
---
.../cluster/query/reader/ClusterReaderFactory.java | 20 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 4 +-
.../db/engine/storagegroup/TsFileProcessor.java | 4 +-
.../db/engine/storagegroup/TsFileResource.java | 6 +-
.../apache/iotdb/db/metadata/path/AlignedPath.java | 269 -----------
.../iotdb/db/metadata/path/MeasurementPath.java | 197 --------
.../apache/iotdb/db/metadata/path/PartialPath.java | 96 ----
.../ResourceByPathUtils.java} | 535 +++++++++++----------
.../iotdb/db/query/executor/LastQueryExecutor.java | 4 +-
.../query/reader/series/SeriesAggregateReader.java | 20 +-
.../reader/series/SeriesRawDataBatchReader.java | 39 +-
.../reader/series/SeriesReaderByTimestamp.java | 6 +-
12 files changed, 351 insertions(+), 849 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index d848aabd8d..ae367dec92 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -555,15 +556,16 @@ public class ClusterReaderFactory {
QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context,
timeFilter, ascending);
valueFilter = queryDataSource.updateFilterUsingTTL(valueFilter);
- return path.createSeriesReader(
- allSensors,
- dataType,
- context,
- queryDataSource,
- timeFilter,
- valueFilter,
- new SlotTsFileFilter(requiredSlots),
- ascending);
+ return ResourceByPathUtils.getResourceInstance(path)
+ .createSeriesReader(
+ allSensors,
+ dataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ valueFilter,
+ new SlotTsFileFilter(requiredSlots),
+ ascending);
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index ad2d99616b..169cb4020c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -634,7 +635,8 @@ public abstract class AbstractMemTable implements IMemTable
{
public ReadOnlyMemChunk query(
PartialPath fullPath, long ttlLowerBound, List<Pair<Modification,
IMemTable>> modsToMemtable)
throws IOException, QueryProcessException {
- return fullPath.getReadOnlyMemChunkFromMemTable(this, modsToMemtable,
ttlLowerBound);
+ return ResourceByPathUtils.getResourceInstance(fullPath)
+ .getReadOnlyMemChunkFromMemTable(this, modsToMemtable, ttlLowerBound);
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 7274954381..b7c8896b0f 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -1482,7 +1483,8 @@ public class TsFileProcessor {
}
List<IChunkMetadata> chunkMetadataList =
- seriesPath.getVisibleMetadataListFromWriter(writer,
tsFileResource, context);
+ ResourceByPathUtils.getResourceInstance(seriesPath)
+ .getVisibleMetadataListFromWriter(writer, tsFileResource,
context);
// get in memory data
if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 610e6c47e4..23fdfb3060 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -1001,8 +1002,9 @@ public class TsFileResource {
for (PartialPath path : pathToChunkMetadataListMap.keySet()) {
pathToTimeSeriesMetadataMap.put(
path,
- path.generateTimeSeriesMetadata(
- pathToReadOnlyMemChunkMap.get(path),
pathToChunkMetadataListMap.get(path)));
+ ResourceByPathUtils.getResourceInstance(path)
+ .generateTimeSeriesMetadata(
+ pathToReadOnlyMemChunkMap.get(path),
pathToChunkMetadataListMap.get(path)));
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index ee0e6a768a..36684490de 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -19,58 +19,23 @@
package org.apache.iotdb.db.metadata.path;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
-import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.fill.AlignedLastPointReader;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
-import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
* VectorPartialPath represents many fullPaths of aligned timeseries. In the
AlignedPath, the nodes
@@ -250,240 +215,6 @@ public class AlignedPath extends PartialPath {
return Objects.hash(super.hashCode(), measurementList);
}
- @Override
- public AlignedLastPointReader createLastPointReader(
- TSDataType dataType,
- Set<String> deviceMeasurements,
- QueryContext context,
- QueryDataSource dataSource,
- long queryTime,
- Filter timeFilter) {
- return new AlignedLastPointReader(
- this, dataType, deviceMeasurements, context, dataSource, queryTime,
timeFilter);
- }
-
- @Override
- public AlignedSeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- QueryDataSource dataSource,
- Filter timeFilter,
- Filter valueFilter,
- TsFileFilter fileFilter,
- boolean ascending) {
- return new AlignedSeriesReader(
- this,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
- }
-
- @Override
- @TestOnly
- public AlignedSeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- return new AlignedSeriesReader(
- this,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- }
-
- @Override
- public TsFileResource createTsFileResource(
- List<ReadOnlyMemChunk> readOnlyMemChunk,
- List<IChunkMetadata> chunkMetadataList,
- TsFileResource originTsFileResource)
- throws IOException {
- TsFileResource tsFileResource =
- new TsFileResource(this, readOnlyMemChunk, chunkMetadataList,
originTsFileResource);
- tsFileResource.setTimeSeriesMetadata(
- this, generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
- return tsFileResource;
- }
-
- /**
- * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables
in the memory don't
- * have chunkMetadata, but query will use these, so we need to generate it
for them.
- */
- public AlignedTimeSeriesMetadata generateTimeSeriesMetadata(
- List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
- throws IOException {
- TimeseriesMetadata timeTimeSeriesMetadata = new TimeseriesMetadata();
- timeTimeSeriesMetadata.setOffsetOfChunkMetaDataList(-1);
- timeTimeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1);
- timeTimeSeriesMetadata.setMeasurementId("");
- timeTimeSeriesMetadata.setTSDataType(TSDataType.VECTOR);
-
- Statistics<? extends Serializable> timeStatistics =
- Statistics.getStatsByType(timeTimeSeriesMetadata.getTSDataType());
-
- // init each value time series meta
- List<TimeseriesMetadata> valueTimeSeriesMetadataList = new ArrayList<>();
- for (IMeasurementSchema valueChunkMetadata : schemaList) {
- TimeseriesMetadata valueMetadata = new TimeseriesMetadata();
- valueMetadata.setOffsetOfChunkMetaDataList(-1);
- valueMetadata.setDataSizeOfChunkMetaDataList(-1);
- valueMetadata.setMeasurementId(valueChunkMetadata.getMeasurementId());
- valueMetadata.setTSDataType(valueChunkMetadata.getType());
-
valueMetadata.setStatistics(Statistics.getStatsByType(valueChunkMetadata.getType()));
- valueTimeSeriesMetadataList.add(valueMetadata);
- }
-
- boolean[] exist = new boolean[schemaList.size()];
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata)
chunkMetadata;
-
timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
- for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
- if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) {
- exist[i] = true;
- valueTimeSeriesMetadataList
- .get(i)
- .getStatistics()
- .mergeStatistics(
-
alignedChunkMetadata.getValueChunkMetadataList().get(i).getStatistics());
- }
- }
- }
-
- for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) {
- if (!memChunk.isEmpty()) {
- AlignedChunkMetadata alignedChunkMetadata =
- (AlignedChunkMetadata) memChunk.getChunkMetaData();
-
timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
- for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
- if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null)
{
- exist[i] = true;
- valueTimeSeriesMetadataList
- .get(i)
- .getStatistics()
- .mergeStatistics(
-
alignedChunkMetadata.getValueChunkMetadataList().get(i).getStatistics());
- }
- }
- }
- }
- timeTimeSeriesMetadata.setStatistics(timeStatistics);
-
- for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
- if (!exist[i]) {
- valueTimeSeriesMetadataList.set(i, null);
- }
- }
-
- return new AlignedTimeSeriesMetadata(timeTimeSeriesMetadata,
valueTimeSeriesMetadataList);
- }
-
- @Override
- public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
- IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound)
- throws QueryProcessException, IOException {
- Map<IDeviceID, IWritableMemChunkGroup> memTableMap =
memTable.getMemTableMap();
- IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(this);
-
- // check If memtable contains this path
- if (!memTableMap.containsKey(deviceID)) {
- return null;
- }
- AlignedWritableMemChunk alignedMemChunk =
- ((AlignedWritableMemChunkGroup)
memTableMap.get(deviceID)).getAlignedMemChunk();
- boolean containsMeasurement = false;
- for (String measurement : measurementList) {
- if (alignedMemChunk.containsMeasurement(measurement)) {
- containsMeasurement = true;
- break;
- }
- }
- if (!containsMeasurement) {
- return null;
- }
- // get sorted tv list is synchronized so different query can get right
sorted list reference
- TVList alignedTvListCopy =
alignedMemChunk.getSortedTvListForQuery(schemaList);
- int curSize = alignedTvListCopy.rowCount();
- List<List<TimeRange>> deletionList = null;
- if (modsToMemtable != null) {
- deletionList = constructDeletionList(memTable, modsToMemtable,
timeLowerBound);
- }
- return new AlignedReadOnlyMemChunk(
- getMeasurementSchema(), alignedTvListCopy, curSize, deletionList);
- }
-
- private List<List<TimeRange>> constructDeletionList(
- IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound) {
- List<List<TimeRange>> deletionList = new ArrayList<>();
- for (String measurement : measurementList) {
- List<TimeRange> columnDeletionList = new ArrayList<>();
- columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
- for (Modification modification : getModificationsForMemtable(memTable,
modsToMemtable)) {
- if (modification instanceof Deletion) {
- Deletion deletion = (Deletion) modification;
- PartialPath fullPath = this.concatNode(measurement);
- if (deletion.getPath().matchFullPath(fullPath)
- && deletion.getEndTime() > timeLowerBound) {
- long lowerBound = Math.max(deletion.getStartTime(),
timeLowerBound);
- columnDeletionList.add(new TimeRange(lowerBound,
deletion.getEndTime()));
- }
- }
- }
- deletionList.add(TimeRange.sortAndMerge(columnDeletionList));
- }
- return deletionList;
- }
-
- @Override
- public List<IChunkMetadata> getVisibleMetadataListFromWriter(
- RestorableTsFileIOWriter writer, TsFileResource tsFileResource,
QueryContext context) {
- ModificationFile modificationFile = tsFileResource.getModFile();
- List<List<Modification>> modifications =
context.getPathModifications(modificationFile, this);
-
- List<AlignedChunkMetadata> chunkMetadataList = new ArrayList<>();
- List<ChunkMetadata> timeChunkMetadataList =
- writer.getVisibleMetadataList(getDevice(), "", getSeriesType());
- List<List<ChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
- for (int i = 0; i < measurementList.size(); i++) {
- valueChunkMetadataList.add(
- writer.getVisibleMetadataList(
- getDevice(), measurementList.get(i),
schemaList.get(i).getType()));
- }
-
- for (int i = 0; i < timeChunkMetadataList.size(); i++) {
- List<IChunkMetadata> valueChunkMetadata = new ArrayList<>();
- // if all the sub sensors doesn't exist, it will be false
- boolean exits = false;
- for (List<ChunkMetadata> chunkMetadata : valueChunkMetadataList) {
- boolean currentExist = i < chunkMetadata.size();
- exits = (exits || currentExist);
- valueChunkMetadata.add(currentExist ? chunkMetadata.get(i) : null);
- }
- if (exits) {
- chunkMetadataList.add(
- new AlignedChunkMetadata(timeChunkMetadataList.get(i),
valueChunkMetadata));
- }
- }
-
- QueryUtils.modifyAlignedChunkMetaData(chunkMetadataList, modifications);
- chunkMetadataList.removeIf(context::chunkNotSatisfy);
- return new ArrayList<>(chunkMetadataList);
- }
-
@Override
public int getColumnNum() {
return measurementList.size();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index a3deb0325c..c141ffe4e7 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -19,50 +19,17 @@
package org.apache.iotdb.db.metadata.path;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.fill.LastPointReader;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
-import org.apache.iotdb.db.query.reader.series.SeriesReader;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
public class MeasurementPath extends PartialPath {
@@ -165,155 +132,6 @@ public class MeasurementPath extends PartialPath {
return isUnderAlignedEntity ? new AlignedPath(this) : this;
}
- @Override
- public LastPointReader createLastPointReader(
- TSDataType dataType,
- Set<String> deviceMeasurements,
- QueryContext context,
- QueryDataSource dataSource,
- long queryTime,
- Filter timeFilter) {
- return new LastPointReader(
- this, dataType, deviceMeasurements, context, dataSource, queryTime,
timeFilter);
- }
-
- public SeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- QueryDataSource dataSource,
- Filter timeFilter,
- Filter valueFilter,
- TsFileFilter fileFilter,
- boolean ascending) {
- return new SeriesReader(
- this,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
- }
-
- @TestOnly
- public SeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- allSensors.add(this.getMeasurement());
- return new SeriesReader(
- this,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- }
-
- @Override
- public TsFileResource createTsFileResource(
- List<ReadOnlyMemChunk> readOnlyMemChunk,
- List<IChunkMetadata> chunkMetadataList,
- TsFileResource originTsFileResource)
- throws IOException {
- TsFileResource tsFileResource =
- new TsFileResource(this, readOnlyMemChunk, chunkMetadataList,
originTsFileResource);
- tsFileResource.setTimeSeriesMetadata(
- this, generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
- return tsFileResource;
- }
-
- /**
- * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables
in the memory don't
- * have chunkMetadata, but query will use these, so we need to generate it
for them.
- */
- public ITimeSeriesMetadata generateTimeSeriesMetadata(
- List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
- throws IOException {
- TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata();
- timeSeriesMetadata.setMeasurementId(measurementSchema.getMeasurementId());
- timeSeriesMetadata.setTSDataType(measurementSchema.getType());
- timeSeriesMetadata.setOffsetOfChunkMetaDataList(-1);
- timeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1);
-
- Statistics<? extends Serializable> seriesStatistics =
- Statistics.getStatsByType(timeSeriesMetadata.getTSDataType());
- // flush chunkMetadataList one by one
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
- }
-
- for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) {
- if (!memChunk.isEmpty()) {
-
seriesStatistics.mergeStatistics(memChunk.getChunkMetaData().getStatistics());
- }
- }
- timeSeriesMetadata.setStatistics(seriesStatistics);
- return timeSeriesMetadata;
- }
-
- @Override
- public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
- IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound)
- throws QueryProcessException, IOException {
- Map<IDeviceID, IWritableMemChunkGroup> memTableMap =
memTable.getMemTableMap();
- IDeviceID deviceID =
DeviceIDFactory.getInstance().getDeviceID(getDevicePath());
- // check If Memtable Contains this path
- if (!memTableMap.containsKey(deviceID)
- || !memTableMap.get(deviceID).contains(getMeasurement())) {
- return null;
- }
- IWritableMemChunk memChunk =
memTableMap.get(deviceID).getMemChunkMap().get(getMeasurement());
- // get sorted tv list is synchronized so different query can get right
sorted list reference
- TVList chunkCopy = memChunk.getSortedTvListForQuery();
- int curSize = chunkCopy.rowCount();
- List<TimeRange> deletionList = null;
- if (modsToMemtable != null) {
- deletionList = constructDeletionList(memTable, modsToMemtable,
timeLowerBound);
- }
- return new ReadOnlyMemChunk(
- getMeasurement(),
- measurementSchema.getType(),
- measurementSchema.getEncodingType(),
- chunkCopy,
- measurementSchema.getProps(),
- curSize,
- deletionList);
- }
-
- /**
- * construct a deletion list from a memtable.
- *
- * @param memTable memtable
- * @param timeLowerBound time water mark
- */
- private List<TimeRange> constructDeletionList(
- IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound) {
- List<TimeRange> deletionList = new ArrayList<>();
- deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
- for (Modification modification : getModificationsForMemtable(memTable,
modsToMemtable)) {
- if (modification instanceof Deletion) {
- Deletion deletion = (Deletion) modification;
- if (deletion.getPath().matchFullPath(this) && deletion.getEndTime() >
timeLowerBound) {
- long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound);
- deletionList.add(new TimeRange(lowerBound, deletion.getEndTime()));
- }
- }
- }
- return TimeRange.sortAndMerge(deletionList);
- }
-
@Override
public MeasurementPath clone() {
MeasurementPath newMeasurementPath = null;
@@ -327,21 +145,6 @@ public class MeasurementPath extends PartialPath {
return newMeasurementPath;
}
- @Override
- public List<IChunkMetadata> getVisibleMetadataListFromWriter(
- RestorableTsFileIOWriter writer, TsFileResource tsFileResource,
QueryContext context) {
- ModificationFile modificationFile = tsFileResource.getModFile();
- List<Modification> modifications =
context.getPathModifications(modificationFile, this);
-
- List<IChunkMetadata> chunkMetadataList =
- new ArrayList<>(
- writer.getVisibleMetadataList(getDevice(), getMeasurement(),
getSeriesType()));
-
- QueryUtils.modifyChunkMetaData(chunkMetadataList, modifications);
- chunkMetadataList.removeIf(context::chunkNotSatisfy);
- return chunkMetadataList;
- }
-
public void serialize(ByteBuffer byteBuffer) {
PathType.Measurement.serialize(byteBuffer);
super.serializeWithoutType(byteBuffer);
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index b730d68629..24c11abfda 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -19,41 +19,24 @@
package org.apache.iotdb.db.metadata.path;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.fill.LastPointReader;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
-import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -484,90 +467,11 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return ret;
}
- public LastPointReader createLastPointReader(
- TSDataType dataType,
- Set<String> deviceMeasurements,
- QueryContext context,
- QueryDataSource dataSource,
- long queryTime,
- Filter timeFilter) {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
- public SeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- QueryDataSource dataSource,
- Filter timeFilter,
- Filter valueFilter,
- TsFileFilter fileFilter,
- boolean ascending) {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
- @TestOnly
- public SeriesReader createSeriesReader(
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
- public TsFileResource createTsFileResource(
- List<ReadOnlyMemChunk> readOnlyMemChunk,
- List<IChunkMetadata> chunkMetadataList,
- TsFileResource originTsFileResource)
- throws IOException {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
- public ITimeSeriesMetadata generateTimeSeriesMetadata(
- List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
- throws IOException {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
- /**
- * get the ReadOnlyMemChunk from the given MemTable.
- *
- * @return ReadOnlyMemChunk
- */
- public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
- IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound)
- throws QueryProcessException, IOException {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
- /** get modifications from a memtable. */
- protected List<Modification> getModificationsForMemtable(
- IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable) {
- List<Modification> modifications = new ArrayList<>();
- boolean foundMemtable = false;
- for (Pair<Modification, IMemTable> entry : modsToMemtable) {
- if (foundMemtable || entry.right.equals(memTable)) {
- modifications.add(entry.left);
- foundMemtable = true;
- }
- }
- return modifications;
- }
-
@Override
public PartialPath clone() {
return new PartialPath(this.getNodes().clone());
}
- public List<IChunkMetadata> getVisibleMetadataListFromWriter(
- RestorableTsFileIOWriter writer, TsFileResource tsFileResource,
QueryContext context) {
- throw new UnsupportedOperationException("Should call exact sub class!");
- }
-
public void serialize(ByteBuffer byteBuffer) {
PathType.Partial.serialize(byteBuffer);
serializeWithoutType(byteBuffer);
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
b/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
similarity index 51%
copy from
server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
copy to
server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
index ee0e6a768a..35929a0bb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.iotdb.db.metadata.path;
+package org.apache.iotdb.db.metadata.utils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
@@ -31,20 +31,25 @@ import
org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.AlignedLastPointReader;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -52,202 +57,101 @@ import
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
+import static org.apache.iotdb.db.metadata.path.AlignedPath.VECTOR_PLACEHOLDER;
+
/**
- * VectorPartialPath represents many fullPaths of aligned timeseries. In the
AlignedPath, the nodes
- * in PartialPath is deviceId e.g. VectorPartialPath nodes=root.sg1.alignedD1
measurementList=[s1,
- * s2]
+ * Obtain required resources through path, such as readers and writers and
etc. AlignedPath and
+ * MeasurementPath have different implementations, and the default PartialPath
should not use it.
*/
-public class AlignedPath extends PartialPath {
-
- private static final Logger logger =
LoggerFactory.getLogger(AlignedPath.class);
-
- // todo improve vector implementation by remove this placeholder
- public static final String VECTOR_PLACEHOLDER = "";
-
- private List<String> measurementList;
- private List<IMeasurementSchema> schemaList;
-
- public AlignedPath() {}
-
- public AlignedPath(String vectorPath, List<String> subSensorsList) throws
IllegalPathException {
- super(vectorPath);
- this.measurementList = subSensorsList;
- }
-
- public AlignedPath(
- String vectorPath, List<String> measurementList,
List<IMeasurementSchema> schemaList)
- throws IllegalPathException {
- super(vectorPath);
- this.measurementList = measurementList;
- this.schemaList = schemaList;
- }
-
- public AlignedPath(String vectorPath, String subSensor) throws
IllegalPathException {
- super(vectorPath);
- measurementList = new ArrayList<>();
- measurementList.add(subSensor);
- }
+public abstract class ResourceByPathUtils {
- public AlignedPath(PartialPath vectorPath, String subSensor) {
- super(vectorPath.getNodes());
- measurementList = new ArrayList<>();
- measurementList.add(subSensor);
- }
-
- public AlignedPath(MeasurementPath path) {
- super(path.getDevicePath().getNodes());
- measurementList = new ArrayList<>();
- measurementList.add(path.getMeasurement());
- schemaList = new ArrayList<>();
- schemaList.add(path.getMeasurementSchema());
- }
-
- public AlignedPath(String vectorPath) throws IllegalPathException {
- super(vectorPath);
- measurementList = new ArrayList<>();
- schemaList = new ArrayList<>();
- }
-
- public PartialPath getDevicePath() {
- return new PartialPath(Arrays.copyOf(nodes, nodes.length));
- }
-
- @Override
- public String getDevice() {
- return getFullPath();
- }
-
- @Override
- public String getMeasurement() {
- throw new UnsupportedOperationException("AlignedPath doesn't have
measurement name!");
- }
-
- public List<String> getMeasurementList() {
- return measurementList;
- }
-
- public String getMeasurement(int index) {
- return measurementList.get(index);
- }
-
- public PartialPath getPathWithMeasurement(int index) {
- return new PartialPath(nodes).concatNode(measurementList.get(index));
- }
-
- public void setMeasurementList(List<String> measurementList) {
- this.measurementList = measurementList;
- }
-
- public void addMeasurements(List<String> measurements) {
- this.measurementList.addAll(measurements);
- }
-
- public void addSchemas(List<IMeasurementSchema> schemas) {
- this.schemaList.addAll(schemas);
+ public static ResourceByPathUtils getResourceInstance(PartialPath path) {
+ if (path instanceof AlignedPath) {
+ return new AlignedResourceByPathUtils(path);
+ } else if (path instanceof MeasurementPath) {
+ return new MeasurementResourceByPathUtils(path);
+ }
+ throw new UnsupportedOperationException("Should call exact sub class!");
}
- public void addMeasurement(MeasurementPath measurementPath) {
- if (measurementList == null) {
- measurementList = new ArrayList<>();
- }
- measurementList.add(measurementPath.getMeasurement());
+ public abstract LastPointReader createLastPointReader(
+ TSDataType dataType,
+ Set<String> deviceMeasurements,
+ QueryContext context,
+ QueryDataSource dataSource,
+ long queryTime,
+ Filter timeFilter);
- if (schemaList == null) {
- schemaList = new ArrayList<>();
- }
- schemaList.add(measurementPath.getMeasurementSchema());
- }
+ public abstract SeriesReader createSeriesReader(
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending);
- /**
- * merge another aligned path's sub sensors into this one
- *
- * @param alignedPath The caller need to ensure the alignedPath must have
same device as this one
- * and these two doesn't have same sub sensor
- */
- public void mergeAlignedPath(AlignedPath alignedPath) {
- if (measurementList == null) {
- measurementList = new ArrayList<>();
- }
- measurementList.addAll(alignedPath.measurementList);
- if (schemaList == null) {
- schemaList = new ArrayList<>();
- }
- schemaList.addAll(alignedPath.schemaList);
- }
+ @TestOnly
+ public abstract SeriesReader createSeriesReader(
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending);
- public List<IMeasurementSchema> getSchemaList() {
- return this.schemaList == null ? Collections.emptyList() : this.schemaList;
- }
+ public abstract TsFileResource createTsFileResource(
+ List<ReadOnlyMemChunk> readOnlyMemChunk,
+ List<IChunkMetadata> chunkMetadataList,
+ TsFileResource originTsFileResource)
+ throws IOException;
- public VectorMeasurementSchema getMeasurementSchema() {
- TSDataType[] types = new TSDataType[measurementList.size()];
- TSEncoding[] encodings = new TSEncoding[measurementList.size()];
+ public abstract ITimeSeriesMetadata generateTimeSeriesMetadata(
+ List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
+ throws IOException;
- for (int i = 0; i < measurementList.size(); i++) {
- types[i] = schemaList.get(i).getType();
- encodings[i] = schemaList.get(i).getEncodingType();
- }
- String[] array = new String[measurementList.size()];
- for (int i = 0; i < array.length; i++) {
- array[i] = measurementList.get(i);
+ public abstract ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
+ IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound)
+ throws QueryProcessException, IOException;
+
+ public abstract List<IChunkMetadata> getVisibleMetadataListFromWriter(
+ RestorableTsFileIOWriter writer, TsFileResource tsFileResource,
QueryContext context);
+
+ /** get modifications from a memtable. */
+ protected List<Modification> getModificationsForMemtable(
+ IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable) {
+ List<Modification> modifications = new ArrayList<>();
+ boolean foundMemtable = false;
+ for (Pair<Modification, IMemTable> entry : modsToMemtable) {
+ if (foundMemtable || entry.right.equals(memTable)) {
+ modifications.add(entry.left);
+ foundMemtable = true;
+ }
}
- return new VectorMeasurementSchema(
- VECTOR_PLACEHOLDER, array, types, encodings,
schemaList.get(0).getCompressor());
+ return modifications;
}
+}
- public TSDataType getSeriesType() {
- return TSDataType.VECTOR;
- }
+class AlignedResourceByPathUtils extends ResourceByPathUtils {
- @Override
- public PartialPath copy() {
- AlignedPath result = new AlignedPath();
- result.nodes = nodes;
- result.fullPath = fullPath;
- result.device = device;
- result.measurementList = new ArrayList<>(measurementList);
- result.schemaList = new ArrayList<>(schemaList);
- return result;
- }
+ AlignedPath partialPath;
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- AlignedPath that = (AlignedPath) o;
- return Objects.equals(measurementList, that.measurementList);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), measurementList);
+ public AlignedResourceByPathUtils(PartialPath partialPath) {
+ this.partialPath = (AlignedPath) partialPath;
}
@Override
@@ -259,7 +163,7 @@ public class AlignedPath extends PartialPath {
long queryTime,
Filter timeFilter) {
return new AlignedLastPointReader(
- this, dataType, deviceMeasurements, context, dataSource, queryTime,
timeFilter);
+ partialPath, dataType, deviceMeasurements, context, dataSource,
queryTime, timeFilter);
}
@Override
@@ -273,7 +177,7 @@ public class AlignedPath extends PartialPath {
TsFileFilter fileFilter,
boolean ascending) {
return new AlignedSeriesReader(
- this,
+ partialPath,
allSensors,
dataType,
context,
@@ -296,7 +200,7 @@ public class AlignedPath extends PartialPath {
Filter valueFilter,
boolean ascending) {
return new AlignedSeriesReader(
- this,
+ partialPath,
allSensors,
dataType,
context,
@@ -314,9 +218,9 @@ public class AlignedPath extends PartialPath {
TsFileResource originTsFileResource)
throws IOException {
TsFileResource tsFileResource =
- new TsFileResource(this, readOnlyMemChunk, chunkMetadataList,
originTsFileResource);
+ new TsFileResource(partialPath, readOnlyMemChunk, chunkMetadataList,
originTsFileResource);
tsFileResource.setTimeSeriesMetadata(
- this, generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList));
+ partialPath, generateTimeSeriesMetadata(readOnlyMemChunk,
chunkMetadataList));
return tsFileResource;
}
@@ -324,6 +228,7 @@ public class AlignedPath extends PartialPath {
* Because the unclosed tsfile don't have TimeSeriesMetadata and memtables
in the memory don't
* have chunkMetadata, but query will use these, so we need to generate it
for them.
*/
+ @Override
public AlignedTimeSeriesMetadata generateTimeSeriesMetadata(
List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
throws IOException {
@@ -338,7 +243,7 @@ public class AlignedPath extends PartialPath {
// init each value time series meta
List<TimeseriesMetadata> valueTimeSeriesMetadataList = new ArrayList<>();
- for (IMeasurementSchema valueChunkMetadata : schemaList) {
+ for (IMeasurementSchema valueChunkMetadata :
(partialPath.getSchemaList())) {
TimeseriesMetadata valueMetadata = new TimeseriesMetadata();
valueMetadata.setOffsetOfChunkMetaDataList(-1);
valueMetadata.setDataSizeOfChunkMetaDataList(-1);
@@ -348,7 +253,7 @@ public class AlignedPath extends PartialPath {
valueTimeSeriesMetadataList.add(valueMetadata);
}
- boolean[] exist = new boolean[schemaList.size()];
+ boolean[] exist = new boolean[partialPath.getSchemaList().size()];
for (IChunkMetadata chunkMetadata : chunkMetadataList) {
AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata)
chunkMetadata;
timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
@@ -397,7 +302,7 @@ public class AlignedPath extends PartialPath {
IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound)
throws QueryProcessException, IOException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap =
memTable.getMemTableMap();
- IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(this);
+ IDeviceID deviceID =
DeviceIDFactory.getInstance().getDeviceID(partialPath);
// check If memtable contains this path
if (!memTableMap.containsKey(deviceID)) {
@@ -406,7 +311,7 @@ public class AlignedPath extends PartialPath {
AlignedWritableMemChunk alignedMemChunk =
((AlignedWritableMemChunkGroup)
memTableMap.get(deviceID)).getAlignedMemChunk();
boolean containsMeasurement = false;
- for (String measurement : measurementList) {
+ for (String measurement : partialPath.getMeasurementList()) {
if (alignedMemChunk.containsMeasurement(measurement)) {
containsMeasurement = true;
break;
@@ -416,7 +321,7 @@ public class AlignedPath extends PartialPath {
return null;
}
// get sorted tv list is synchronized so different query can get right
sorted list reference
- TVList alignedTvListCopy =
alignedMemChunk.getSortedTvListForQuery(schemaList);
+ TVList alignedTvListCopy =
alignedMemChunk.getSortedTvListForQuery(partialPath.getSchemaList());
int curSize = alignedTvListCopy.rowCount();
List<List<TimeRange>> deletionList = null;
if (modsToMemtable != null) {
@@ -426,16 +331,37 @@ public class AlignedPath extends PartialPath {
getMeasurementSchema(), alignedTvListCopy, curSize, deletionList);
}
+ public VectorMeasurementSchema getMeasurementSchema() {
+ List<String> measurementList = partialPath.getMeasurementList();
+ TSDataType[] types = new TSDataType[measurementList.size()];
+ TSEncoding[] encodings = new TSEncoding[measurementList.size()];
+
+ for (int i = 0; i < measurementList.size(); i++) {
+ types[i] = partialPath.getSchemaList().get(i).getType();
+ encodings[i] = partialPath.getSchemaList().get(i).getEncodingType();
+ }
+ String[] array = new String[measurementList.size()];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = measurementList.get(i);
+ }
+ return new VectorMeasurementSchema(
+ VECTOR_PLACEHOLDER,
+ array,
+ types,
+ encodings,
+ partialPath.getSchemaList().get(0).getCompressor());
+ }
+
private List<List<TimeRange>> constructDeletionList(
IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound) {
List<List<TimeRange>> deletionList = new ArrayList<>();
- for (String measurement : measurementList) {
+ for (String measurement : partialPath.getMeasurementList()) {
List<TimeRange> columnDeletionList = new ArrayList<>();
columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
for (Modification modification : getModificationsForMemtable(memTable,
modsToMemtable)) {
if (modification instanceof Deletion) {
Deletion deletion = (Deletion) modification;
- PartialPath fullPath = this.concatNode(measurement);
+ PartialPath fullPath = partialPath.concatNode(measurement);
if (deletion.getPath().matchFullPath(fullPath)
&& deletion.getEndTime() > timeLowerBound) {
long lowerBound = Math.max(deletion.getStartTime(),
timeLowerBound);
@@ -452,16 +378,19 @@ public class AlignedPath extends PartialPath {
public List<IChunkMetadata> getVisibleMetadataListFromWriter(
RestorableTsFileIOWriter writer, TsFileResource tsFileResource,
QueryContext context) {
ModificationFile modificationFile = tsFileResource.getModFile();
- List<List<Modification>> modifications =
context.getPathModifications(modificationFile, this);
+ List<List<Modification>> modifications =
+ context.getPathModifications(modificationFile, partialPath);
List<AlignedChunkMetadata> chunkMetadataList = new ArrayList<>();
List<ChunkMetadata> timeChunkMetadataList =
- writer.getVisibleMetadataList(getDevice(), "", getSeriesType());
+ writer.getVisibleMetadataList(partialPath.getDevice(), "",
partialPath.getSeriesType());
List<List<ChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
- for (int i = 0; i < measurementList.size(); i++) {
+ for (int i = 0; i < partialPath.getMeasurementList().size(); i++) {
valueChunkMetadataList.add(
writer.getVisibleMetadataList(
- getDevice(), measurementList.get(i),
schemaList.get(i).getType()));
+ partialPath.getDevice(),
+ partialPath.getMeasurementList().get(i),
+ partialPath.getSchemaList().get(i).getType()));
}
for (int i = 0; i < timeChunkMetadataList.size(); i++) {
@@ -483,76 +412,196 @@ public class AlignedPath extends PartialPath {
chunkMetadataList.removeIf(context::chunkNotSatisfy);
return new ArrayList<>(chunkMetadataList);
}
+}
+
+class MeasurementResourceByPathUtils extends ResourceByPathUtils {
+
+ MeasurementPath partialPath;
+
+ protected MeasurementResourceByPathUtils(PartialPath partialPath) {
+ this.partialPath = (MeasurementPath) partialPath;
+ }
@Override
- public int getColumnNum() {
- return measurementList.size();
+ public LastPointReader createLastPointReader(
+ TSDataType dataType,
+ Set<String> deviceMeasurements,
+ QueryContext context,
+ QueryDataSource dataSource,
+ long queryTime,
+ Filter timeFilter) {
+ return new LastPointReader(
+ partialPath, dataType, deviceMeasurements, context, dataSource,
queryTime, timeFilter);
}
@Override
- public AlignedPath clone() {
- AlignedPath alignedPath = null;
- try {
- alignedPath =
- new AlignedPath(
- this.getDevice(),
- new ArrayList<>(this.measurementList),
- new ArrayList<>(this.schemaList));
- } catch (IllegalPathException e) {
- logger.warn("path is illegal: {}", this.getFullPath(), e);
- }
- return alignedPath;
+ public SeriesReader createSeriesReader(
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ return new SeriesReader(
+ partialPath,
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ }
+
+ @TestOnly
+ public SeriesReader createSeriesReader(
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ allSensors.add(partialPath.getMeasurement());
+ return new SeriesReader(
+ partialPath,
+ allSensors,
+ dataType,
+ context,
+ seqFileResource,
+ unseqFileResource,
+ timeFilter,
+ valueFilter,
+ ascending);
+ }
+
+ @Override
+ public TsFileResource createTsFileResource(
+ List<ReadOnlyMemChunk> readOnlyMemChunk,
+ List<IChunkMetadata> chunkMetadataList,
+ TsFileResource originTsFileResource)
+ throws IOException {
+ TsFileResource tsFileResource =
+ new TsFileResource(partialPath, readOnlyMemChunk, chunkMetadataList,
originTsFileResource);
+ tsFileResource.setTimeSeriesMetadata(
+ partialPath, generateTimeSeriesMetadata(readOnlyMemChunk,
chunkMetadataList));
+ return tsFileResource;
}
- public void serialize(ByteBuffer byteBuffer) {
- PathType.Aligned.serialize(byteBuffer);
- super.serializeWithoutType(byteBuffer);
- ReadWriteIOUtils.write(measurementList.size(), byteBuffer);
- for (String measurement : measurementList) {
- ReadWriteIOUtils.write(measurement, byteBuffer);
+ /**
+ * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables
in the memory don't
+ * have chunkMetadata, but query will use these, so we need to generate it
for them.
+ */
+ @Override
+ public ITimeSeriesMetadata generateTimeSeriesMetadata(
+ List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
+ throws IOException {
+ TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata();
+
timeSeriesMetadata.setMeasurementId(partialPath.getMeasurementSchema().getMeasurementId());
+
timeSeriesMetadata.setTSDataType(partialPath.getMeasurementSchema().getType());
+ timeSeriesMetadata.setOffsetOfChunkMetaDataList(-1);
+ timeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1);
+
+ Statistics<? extends Serializable> seriesStatistics =
+ Statistics.getStatsByType(timeSeriesMetadata.getTSDataType());
+ // flush chunkMetadataList one by one
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
- if (schemaList == null) {
- ReadWriteIOUtils.write(-1, byteBuffer);
- } else {
- ReadWriteIOUtils.write(schemaList.size(), byteBuffer);
- for (IMeasurementSchema measurementSchema : schemaList) {
- if (measurementSchema instanceof MeasurementSchema) {
- ReadWriteIOUtils.write((byte) 0, byteBuffer);
- } else if (measurementSchema instanceof VectorMeasurementSchema) {
- ReadWriteIOUtils.write((byte) 1, byteBuffer);
- }
- measurementSchema.serializeTo(byteBuffer);
+
+ for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) {
+ if (!memChunk.isEmpty()) {
+
seriesStatistics.mergeStatistics(memChunk.getChunkMetaData().getStatistics());
}
}
+ timeSeriesMetadata.setStatistics(seriesStatistics);
+ return timeSeriesMetadata;
}
- public static AlignedPath deserialize(ByteBuffer byteBuffer) {
- PartialPath partialPath = PartialPath.deserialize(byteBuffer);
- AlignedPath alignedPath = new AlignedPath();
- int measurementSize = ReadWriteIOUtils.readInt(byteBuffer);
- List<String> measurements = new ArrayList<>();
- for (int i = 0; i < measurementSize; i++) {
- measurements.add(ReadWriteIOUtils.readString(byteBuffer));
+ @Override
+ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
+ IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound)
+ throws QueryProcessException, IOException {
+ Map<IDeviceID, IWritableMemChunkGroup> memTableMap =
memTable.getMemTableMap();
+ IDeviceID deviceID =
DeviceIDFactory.getInstance().getDeviceID(partialPath.getDevicePath());
+ // check If Memtable Contains this path
+ if (!memTableMap.containsKey(deviceID)
+ || !memTableMap.get(deviceID).contains(partialPath.getMeasurement())) {
+ return null;
}
- int measurementSchemaSize = ReadWriteIOUtils.readInt(byteBuffer);
- List<IMeasurementSchema> measurementSchemas = null;
- if (measurementSchemaSize != -1) {
- measurementSchemas = new ArrayList<>();
- for (int i = 0; i < measurementSchemaSize; i++) {
- byte type = ReadWriteIOUtils.readByte(byteBuffer);
- if (type == 0) {
-
measurementSchemas.add(MeasurementSchema.deserializeFrom(byteBuffer));
- } else if (type == 1) {
-
measurementSchemas.add(VectorMeasurementSchema.deserializeFrom(byteBuffer));
+ IWritableMemChunk memChunk =
+
memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement());
+ // get sorted tv list is synchronized so different query can get right
sorted list reference
+ TVList chunkCopy = memChunk.getSortedTvListForQuery();
+ int curSize = chunkCopy.rowCount();
+ List<TimeRange> deletionList = null;
+ if (modsToMemtable != null) {
+ deletionList = constructDeletionList(memTable, modsToMemtable,
timeLowerBound);
+ }
+ return new ReadOnlyMemChunk(
+ partialPath.getMeasurement(),
+ partialPath.getMeasurementSchema().getType(),
+ partialPath.getMeasurementSchema().getEncodingType(),
+ chunkCopy,
+ partialPath.getMeasurementSchema().getProps(),
+ curSize,
+ deletionList);
+ }
+ /**
+ * construct a deletion list from a memtable.
+ *
+ * @param memTable memtable
+ * @param timeLowerBound time water mark
+ */
+ private List<TimeRange> constructDeletionList(
+ IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable,
long timeLowerBound) {
+ List<TimeRange> deletionList = new ArrayList<>();
+ deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
+ for (Modification modification : getModificationsForMemtable(memTable,
modsToMemtable)) {
+ if (modification instanceof Deletion) {
+ Deletion deletion = (Deletion) modification;
+ if (deletion.getPath().matchFullPath(partialPath)
+ && deletion.getEndTime() > timeLowerBound) {
+ long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound);
+ deletionList.add(new TimeRange(lowerBound, deletion.getEndTime()));
}
}
}
+ return TimeRange.sortAndMerge(deletionList);
+ }
+ /** get modifications from a memtable. */
+ protected List<Modification> getModificationsForMemtable(
+ IMemTable memTable, List<Pair<Modification, IMemTable>> modsToMemtable) {
+ List<Modification> modifications = new ArrayList<>();
+ boolean foundMemtable = false;
+ for (Pair<Modification, IMemTable> entry : modsToMemtable) {
+ if (foundMemtable || entry.right.equals(memTable)) {
+ modifications.add(entry.left);
+ foundMemtable = true;
+ }
+ }
+ return modifications;
+ }
+
+ @Override
+ public List<IChunkMetadata> getVisibleMetadataListFromWriter(
+ RestorableTsFileIOWriter writer, TsFileResource tsFileResource,
QueryContext context) {
+ ModificationFile modificationFile = tsFileResource.getModFile();
+ List<Modification> modifications =
context.getPathModifications(modificationFile, partialPath);
+
+ List<IChunkMetadata> chunkMetadataList =
+ new ArrayList<>(
+ writer.getVisibleMetadataList(
+ partialPath.getDevice(),
+ partialPath.getMeasurement(),
+ partialPath.getSeriesType()));
- alignedPath.measurementList = measurements;
- alignedPath.schemaList = measurementSchemas;
- alignedPath.nodes = partialPath.nodes;
- alignedPath.device = partialPath.getDevice();
- alignedPath.fullPath = partialPath.getFullPath();
- return alignedPath;
+ QueryUtils.modifyChunkMetaData(chunkMetadataList, modifications);
+ chunkMetadataList.removeIf(context::chunkNotSatisfy);
+ return chunkMetadataList;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index d3501141ce..fa280392ad 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -283,8 +284,7 @@ public class LastQueryExecutor {
QueryResourceManager.getInstance()
.getQueryDataSource(seriesPaths.get(i), context, filter,
ascending);
LastPointReader lastReader =
- seriesPaths
- .get(i)
+ ResourceByPathUtils.getResourceInstance(seriesPaths.get(i))
.createLastPointReader(
dataTypes.get(i),
deviceMeasurementsMap.getOrDefault(
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index 2371b9609d..2f0282c591 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -48,15 +49,16 @@ public class SeriesAggregateReader implements
IAggregateReader {
TsFileFilter fileFilter,
boolean ascending) {
this.seriesReader =
- seriesPath.createSeriesReader(
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
+ ResourceByPathUtils.getResourceInstance(seriesPath)
+ .createSeriesReader(
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
}
@TestOnly
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index b0b9d204cf..4fdb2c8e4a 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -58,15 +59,16 @@ public class SeriesRawDataBatchReader implements
ManagedSeriesReader {
TsFileFilter fileFilter,
boolean ascending) {
this.seriesReader =
- seriesPath.createSeriesReader(
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
+ ResourceByPathUtils.getResourceInstance(seriesPath)
+ .createSeriesReader(
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
}
@TestOnly
@@ -82,15 +84,16 @@ public class SeriesRawDataBatchReader implements
ManagedSeriesReader {
boolean ascending) {
Set<String> allSensors = new HashSet<>();
this.seriesReader =
- seriesPath.createSeriesReader(
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
+ ResourceByPathUtils.getResourceInstance(seriesPath)
+ .createSeriesReader(
+ allSensors,
+ dataType,
+ context,
+ seqFileResource,
+ unseqFileResource,
+ timeFilter,
+ valueFilter,
+ ascending);
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index f9e12ebb86..9b7fc3816c 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,8 +51,9 @@ public class SeriesReaderByTimestamp implements
IReaderByTimestamp {
boolean ascending) {
Filter timeFilter = TimeFilter.defaultTimeFilter(ascending);
this.seriesReader =
- seriesPath.createSeriesReader(
- allSensors, dataType, context, dataSource, timeFilter, null,
fileFilter, ascending);
+ ResourceByPathUtils.getResourceInstance(seriesPath)
+ .createSeriesReader(
+ allSensors, dataType, context, dataSource, timeFilter, null,
fileFilter, ascending);
this.ascending = ascending;
}