This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bef8a97 [IOTDB-631] Using new TsFile MetadataIndex to optimize query
and cache (#1134)
bef8a97 is described below
commit bef8a97bf5d78267ce89063ca0409c4ee0438476
Author: Zesong Sun <[email protected]>
AuthorDate: Wed May 6 19:43:56 2020 +0800
[IOTDB-631] Using new TsFile MetadataIndex to optimize query and cache
(#1134)
* [IOTDB-631] Using new TsFile MetadataIndex to optimize query and cache
---
.../db/engine/cache/TimeSeriesMetadataCache.java | 56 +++++----
.../db/qp/physical/crud/RawDataQueryPlan.java | 16 ++-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 70 +++++++++--
.../reader/series/SeriesRawDataBatchReader.java | 24 ++--
.../query/timegenerator/ServerTimeGenerator.java | 5 +-
.../db/integration/IoTDBEngineTimeGeneratorIT.java | 37 ++++--
.../tsfile/file/metadata/MetadataIndexNode.java | 2 +-
.../file/metadata/enums/MetadataIndexNodeType.java | 9 ++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 130 ++++++++++++++++++---
.../read/controller/MetadataQuerierByFileImpl.java | 10 +-
10 files changed, 278 insertions(+), 81 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index e586cde..e620cd1 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -19,6 +19,13 @@
package org.apache.iotdb.db.engine.cache;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -30,22 +37,17 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The
caching
- * strategy is LRU.
+ * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The
caching strategy is
+ * LRU.
*/
public class TimeSeriesMetadataCache {
private static final Logger logger =
LoggerFactory.getLogger(TimeSeriesMetadataCache.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE =
config.getAllocateMemoryForTimeSeriesMetaDataCache();
+ private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE =
config
+ .getAllocateMemoryForTimeSeriesMetaDataCache();
private static boolean cacheEnable = config.isMetaDataCacheEnable();
private final LRULinkedHashMap<TimeSeriesMetadataCacheKey,
TimeseriesMetadata> lruCache;
@@ -58,9 +60,11 @@ public class TimeSeriesMetadataCache {
private TimeSeriesMetadataCache() {
logger.info("TimeseriesMetadataCache size = " +
MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE);
- lruCache = new LRULinkedHashMap<TimeSeriesMetadataCacheKey,
TimeseriesMetadata>(MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE, true) {
+ lruCache = new LRULinkedHashMap<TimeSeriesMetadataCacheKey,
TimeseriesMetadata>(
+ MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE, true) {
int count = 0;
long averageSize = 0;
+
@Override
protected long calEntrySize(TimeSeriesMetadataCacheKey key,
TimeseriesMetadata value) {
if (count < 10) {
@@ -83,7 +87,8 @@ public class TimeSeriesMetadataCache {
return TimeSeriesMetadataCache.TimeSeriesMetadataCacheHolder.INSTANCE;
}
- public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String>
allSensors) throws IOException {
+ public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String>
allSensors)
+ throws IOException {
if (!cacheEnable) {
// bloom filter part
TsFileMetadata fileMetaData =
TsFileMetaDataCache.getInstance().get(key.filePath);
@@ -125,9 +130,13 @@ public class TimeSeriesMetadataCache {
return null;
}
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(key.filePath, true);
- TimeseriesMetadata timeseriesMetadata =
reader.readTimeseriesMetadata(new Path(key.device, key.measurement));
- lruCache.put(key, timeseriesMetadata);
- return timeseriesMetadata;
+ List<TimeseriesMetadata> timeSeriesMetadataList = reader
+ .readTimeseriesMetadata(key.device, allSensors);
+ // put TimeSeriesMetadata of all sensors used in this query into cache
+ timeSeriesMetadataList.forEach(timeseriesMetadata ->
+ lruCache.put(new TimeSeriesMetadataCacheKey(key.filePath, key.device,
+ timeseriesMetadata.getMeasurementId()), timeseriesMetadata));
+ return lruCache.get(key);
} catch (IOException e) {
logger.error("something wrong happened while reading {}", key.filePath);
throw e;
@@ -143,9 +152,9 @@ public class TimeSeriesMetadataCache {
return;
}
logger.debug(
- "[TimeSeriesMetadata cache {}hit] The number of requests for cache
is {}, hit rate is {}.",
- isHit ? "" : "didn't ", cacheRequestNum.get(),
- cacheHitNum.get() * 1.0 / cacheRequestNum.get());
+ "[TimeSeriesMetadata cache {}hit] The number of requests for cache is
{}, hit rate is {}.",
+ isHit ? "" : "didn't ", cacheRequestNum.get(),
+ cacheHitNum.get() * 1.0 / cacheRequestNum.get());
}
public double calculateTimeSeriesMetadataHitRatio() {
@@ -177,6 +186,7 @@ public class TimeSeriesMetadataCache {
}
public static class TimeSeriesMetadataCacheKey {
+
private String filePath;
private String device;
private String measurement;
@@ -189,12 +199,16 @@ public class TimeSeriesMetadataCache {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
TimeSeriesMetadataCacheKey that = (TimeSeriesMetadataCacheKey) o;
return Objects.equals(filePath, that.filePath) &&
- Objects.equals(device, that.device) &&
- Objects.equals(measurement, that.measurement);
+ Objects.equals(device, that.device) &&
+ Objects.equals(measurement, that.measurement);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index eeb4b5b..6f3a584 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -18,11 +18,17 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
-import java.util.*;
public class RawDataQueryPlan extends QueryPlan {
@@ -52,7 +58,8 @@ public class RawDataQueryPlan extends QueryPlan {
}
public void addDeduplicatedPaths(Path path) {
- deviceToMeasurements.computeIfAbsent(path.getDevice(), key -> new
HashSet<>()).add(path.getMeasurement());
+ deviceToMeasurements.computeIfAbsent(path.getDevice(), key -> new
HashSet<>())
+ .add(path.getMeasurement());
this.deduplicatedPaths.add(path);
}
@@ -85,4 +92,9 @@ public class RawDataQueryPlan extends QueryPlan {
return deviceToMeasurements.getOrDefault(device, Collections.emptySet());
}
+ public void addFilterPathInDeviceToMeasurements(Path path) {
+ deviceToMeasurements.computeIfAbsent(path.getDevice(), key -> new
HashSet<>())
+ .add(path.getMeasurement());
+ }
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 7f97a8a..6ee34a8 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -18,6 +18,14 @@
*/
package org.apache.iotdb.db.qp.strategy;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
@@ -27,22 +35,67 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
-import org.apache.iotdb.db.qp.logical.crud.*;
-import org.apache.iotdb.db.qp.logical.sys.*;
+import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
+import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.db.qp.logical.sys.CountOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateTimeSeriesOperator;
+import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
+import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
+import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
+import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
+import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator;
+import org.apache.iotdb.db.qp.logical.sys.MoveFileOperator;
+import org.apache.iotdb.db.qp.logical.sys.RemoveFileOperator;
+import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
+import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.utils.Pair;
-import java.util.*;
-
-/** Used to convert logical operator to physical plan */
+/**
+ * Used to convert logical operator to physical plan
+ */
public class PhysicalGenerator {
public PhysicalPlan transformToPhysicalPlan(Operator operator) throws
QueryProcessException {
@@ -382,7 +435,7 @@ public class PhysicalGenerator {
} catch (MetadataException e) {
throw new LogicalOptimizeException(
String.format(
- "Error when getting all paths of a full path: %s",
fullPath.getFullPath())
+ "Error when getting all paths of a full path: %s",
fullPath.getFullPath())
+ e.getMessage());
}
}
@@ -432,6 +485,7 @@ public class PhysicalGenerator {
List<TSDataType> seriesTypes = getSeriesTypes(filterPaths);
HashMap<Path, TSDataType> pathTSDataTypeHashMap = new HashMap<>();
for (int i = 0; i < filterPaths.size(); i++) {
+ ((RawDataQueryPlan)
queryPlan).addFilterPathInDeviceToMeasurements(filterPaths.get(i));
pathTSDataTypeHashMap.put(filterPaths.get(i), seriesTypes.get(i));
}
IExpression expression =
filterOperator.transformToExpression(pathTSDataTypeHashMap);
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 6e11dd3..cdb0523 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.db.query.reader.series;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -28,11 +32,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
public class SeriesRawDataBatchReader implements ManagedSeriesReader {
private final SeriesReader seriesReader;
@@ -48,18 +47,21 @@ public class SeriesRawDataBatchReader implements
ManagedSeriesReader {
this.seriesReader = seriesReader;
}
- public SeriesRawDataBatchReader(Path seriesPath, Set<String> allSensors,
TSDataType dataType, QueryContext context,
- QueryDataSource dataSource, Filter
timeFilter, Filter valueFilter, TsFileFilter fileFilter) {
- this.seriesReader = new SeriesReader(seriesPath, allSensors, dataType,
context, dataSource, timeFilter,
- valueFilter, fileFilter);
+ public SeriesRawDataBatchReader(Path seriesPath, Set<String> allSensors,
TSDataType dataType,
+ QueryContext context, QueryDataSource dataSource, Filter timeFilter,
Filter valueFilter,
+ TsFileFilter fileFilter) {
+ this.seriesReader = new SeriesReader(seriesPath, allSensors, dataType,
context, dataSource,
+ timeFilter, valueFilter, fileFilter);
}
@TestOnly
public SeriesRawDataBatchReader(Path seriesPath, TSDataType dataType,
QueryContext context,
List<TsFileResource> seqFileResource, List<TsFileResource>
unseqFileResource,
Filter timeFilter, Filter valueFilter) {
- this.seriesReader = new SeriesReader(seriesPath, new HashSet<>(),
dataType, context, seqFileResource,
- unseqFileResource, timeFilter, valueFilter);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add(seriesPath.getMeasurement());
+ this.seriesReader = new SeriesReader(seriesPath, allSensors, dataType,
context,
+ seqFileResource, unseqFileResource, timeFilter, valueFilter);
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index a2a5a50..7b857cd 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.timegenerator;
+import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.MManager;
@@ -33,10 +34,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import java.io.IOException;
-
/**
- * A timestamp generator for query with filter. e.g. For query clause "select
s1, s2 form root where
+ * A timestamp generator for query with filter. e.g. For query clause "select
s1, s2 from root where
* s3 < 0 and time > 100", this class can iterate back to every timestamp of
the query.
*/
public class ServerTimeGenerator extends TimeGenerator {
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
index 8291e93..6a17faa 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
@@ -18,6 +18,18 @@
*/
package org.apache.iotdb.db.integration;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -38,15 +50,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
-import static org.junit.Assert.*;
-
/**
* Notice that, all test begins with "IoTDB" is integration test. All test
which will start the
* IoTDB server should be defined as integration test.
@@ -129,7 +132,7 @@ public class IoTDBEngineTimeGeneratorIT {
}
}
- statement.execute("flush");
+ statement.execute("FLUSH");
// insert data (time from 1200-1499)
for (long time = 1200; time < 1500; time++) {
@@ -210,8 +213,13 @@ public class IoTDBEngineTimeGeneratorIT {
ValueFilter.ValueGtEq valueGtEq = ValueFilter.gtEq(5);
IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0,
valueGtEq);
+ RawDataQueryPlan queryPlan = new RawDataQueryPlan();
+ List<Path> paths = new ArrayList<>();
+ paths.add(pd1s0);
+ queryPlan.setDeduplicatedPaths(paths);
+
ServerTimeGenerator timeGenerator = new
ServerTimeGenerator(singleSeriesExpression,
- TEST_QUERY_CONTEXT, new RawDataQueryPlan());
+ TEST_QUERY_CONTEXT, queryPlan);
int cnt = 0;
while (timeGenerator.hasNext()) {
@@ -243,8 +251,13 @@ public class IoTDBEngineTimeGeneratorIT {
IExpression andExpression = BinaryExpression
.and(singleSeriesExpression1, singleSeriesExpression2);
+ RawDataQueryPlan queryPlan = new RawDataQueryPlan();
+ List<Path> paths = new ArrayList<>();
+ paths.add(pd0s0);
+ paths.add(pd0s2);
+ queryPlan.setDeduplicatedPaths(paths);
ServerTimeGenerator timeGenerator = new ServerTimeGenerator(andExpression,
- TEST_QUERY_CONTEXT, new RawDataQueryPlan());
+ TEST_QUERY_CONTEXT, queryPlan);
int cnt = 0;
while (timeGenerator.hasNext()) {
long time = timeGenerator.next();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 275521e..28fe5df 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -120,6 +120,6 @@ public class MetadataIndexNode {
return mid; // key found
}
}
- return low - 1; // key not found
+ return low == 0 ? low : low - 1; // key not found
}
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/MetadataIndexNodeType.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/MetadataIndexNodeType.java
index cb0a2c2..ecc73b3 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/MetadataIndexNodeType.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/MetadataIndexNodeType.java
@@ -23,6 +23,15 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+
+/**
+ * The type of MetadataIndexNode
+ *
+ * INTERNAL_DEVICE: internal nodes of the index tree's device level
+ * LEAF_DEVICE: leaf nodes of the index tree's device level, points to
measurement level
+ * INTERNAL_MEASUREMENT: internal nodes of the index tree's measurement level
+ * LEAF_MEASUREMENT: leaf nodes of the index tree's device level, points to
TimeseriesMetadata
+ */
public enum MetadataIndexNodeType {
INTERNAL_DEVICE, LEAF_DEVICE, INTERNAL_MEASUREMENT, LEAF_MEASUREMENT;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 1e3c8a7..efb5c31 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -34,7 +33,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -178,9 +176,8 @@ public class TsFileSequenceReader implements AutoCloseable {
metadataSize.flip();
// read file metadata size and position
fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
- fileMetadataPos =
- tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length -
Integer.BYTES
- - fileMetadataSize;
+ fileMetadataPos = tsFileInput.size() -
TSFileConfig.MAGIC_STRING.getBytes().length
+ - Integer.BYTES - fileMetadataSize;
}
}
@@ -314,13 +311,13 @@ public class TsFileSequenceReader implements
AutoCloseable {
public TimeseriesMetadata readTimeseriesMetadata(Path path) throws
IOException {
readFileMetadata();
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetaDataAndEndOffset(
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
deviceMetadataIndexNode, path.getDevice(),
MetadataIndexNodeType.INTERNAL_DEVICE);
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
while (!metadataIndexPair.left.getChildNodeType()
.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
MetadataIndexNode metadataIndexNode =
MetadataIndexNode.deserializeFrom(buffer);
- metadataIndexPair = getMetaDataAndEndOffset(metadataIndexNode,
+ metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
path.getMeasurement(), MetadataIndexNodeType.INTERNAL_MEASUREMENT);
}
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
@@ -328,14 +325,101 @@ public class TsFileSequenceReader implements
AutoCloseable {
while (buffer.hasRemaining()) {
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer));
}
- String[] measurementNameList = timeseriesMetadataList.stream()
- .map(TimeseriesMetadata::getMeasurementId).collect(Collectors.toList())
- .toArray(new String[timeseriesMetadataList.size()]);
-
// return null if path does not exist in the TsFile
- int searchResult;
- return (searchResult = Arrays.binarySearch(measurementNameList,
path.getMeasurement())) >= 0
- ? timeseriesMetadataList.get(searchResult) : null;
+ int searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
+ path.getMeasurement());
+ return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
+ }
+
+ public List<TimeseriesMetadata> readTimeseriesMetadata(String device,
Set<String> measurements)
+ throws IOException {
+ readFileMetadata();
+ MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getMetadataIndex();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
+ deviceMetadataIndexNode, device,
MetadataIndexNodeType.INTERNAL_DEVICE);
+ List<TimeseriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
+ int maxDegreeOfIndexNode = config.getMaxDegreeOfIndexNode();
+ if (measurements.size() > maxDegreeOfIndexNode /
Math.log(maxDegreeOfIndexNode)) {
+
traverseAndReadTimeseriesMetadataInOneDevice(resultTimeseriesMetadataList,
metadataIndexPair,
+ measurements);
+ return resultTimeseriesMetadataList;
+ }
+ for (String measurement : measurements) {
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair =
metadataIndexPair;
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ while (!measurementMetadataIndexPair.left.getChildNodeType()
+ .equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ MetadataIndexNode metadataIndexNode =
MetadataIndexNode.deserializeFrom(buffer);
+ measurementMetadataIndexPair =
getMetadataAndEndOffset(metadataIndexNode,
+ measurement, MetadataIndexNodeType.INTERNAL_MEASUREMENT);
+ }
+ buffer = readData(measurementMetadataIndexPair.left.getOffset(),
+ measurementMetadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer));
+ }
+ int searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
+ measurement);
+ if (searchResult >= 0) {
+
resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
+ }
+ }
+ return resultTimeseriesMetadataList;
+ }
+
+ private void traverseAndReadTimeseriesMetadataInOneDevice(
+ List<TimeseriesMetadata> timeseriesMetadataList,
+ Pair<MetadataIndexEntry, Long> metadataIndexPair, Set<String>
measurements)
+ throws IOException {
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ switch (metadataIndexPair.left.getChildNodeType()) {
+ case LEAF_DEVICE:
+ case INTERNAL_MEASUREMENT:
+ MetadataIndexNode metadataIndexNode =
MetadataIndexNode.deserializeFrom(buffer);
+ int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = metadataIndexNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ }
+ traverseAndReadTimeseriesMetadataInOneDevice(timeseriesMetadataList,
+ new Pair<>(metadataIndexNode.getChildren().get(i), endOffset),
measurements);
+ }
+ break;
+ case LEAF_MEASUREMENT:
+ while (buffer.hasRemaining()) {
+ TimeseriesMetadata timeseriesMetadata =
TimeseriesMetadata.deserializeFrom(buffer);
+ if (measurements.contains(timeseriesMetadata.getMeasurementId())) {
+ timeseriesMetadataList.add(timeseriesMetadata);
+ }
+ }
+ break;
+ default:
+ throw new IOException("Failed to traverse and read TimeseriesMetadata
in device: " +
+ metadataIndexPair.left.getName() + ". Wrong MetadataIndexEntry
type.");
+ }
+ }
+
+ private int binarySearchInTimeseriesMetadataList(List<TimeseriesMetadata>
timeseriesMetadataList,
+ String key) {
+ int low = 0;
+ int high = timeseriesMetadataList.size() - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ TimeseriesMetadata midVal = timeseriesMetadataList.get(mid);
+ int cmp = midVal.getMeasurementId().compareTo(key);
+
+ if (cmp < 0) {
+ low = mid + 1;
+ } else if (cmp > 0) {
+ high = mid - 1;
+ } else {
+ return mid; // key found
+ }
+ }
+ return -1; // key not found
}
public List<String> getAllDevices() throws IOException {
@@ -489,7 +573,7 @@ public class TsFileSequenceReader implements AutoCloseable {
private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device)
throws IOException {
MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetaDataAndEndOffset(
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
metadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE);
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new
TreeMap<>();
@@ -501,14 +585,26 @@ public class TsFileSequenceReader implements
AutoCloseable {
return deviceTimeseriesMetadata;
}
- private Pair<MetadataIndexEntry, Long>
getMetaDataAndEndOffset(MetadataIndexNode metadataIndex,
+ /**
+ * Get target MetadataIndexEntry and its end offset
+ *
+ * @param metadataIndex given MetadataIndexNode
+ * @param name target device / measurement name
+ * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or
INTERNAL_MEASUREMENT. When
+ * searching for a device node, return when it is not INTERNAL_DEVICE.
Likewise, when searching
+ * for a measurement node, return when it is not INTERNAL_MEASUREMENT. This
works for the
+ * situation when the index tree does NOT have the device level and ONLY has
the measurement
+ * level.
+ * @return target MetadataIndexEntry, endOffset pair
+ */
+ private Pair<MetadataIndexEntry, Long>
getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
String name, MetadataIndexNodeType type) throws IOException {
Pair<MetadataIndexEntry, Long> childIndexEntry =
metadataIndex.getChildIndexEntry(name);
if (!childIndexEntry.left.getChildNodeType().equals(type)) {
return childIndexEntry;
}
ByteBuffer buffer = readData(childIndexEntry.left.getOffset(),
childIndexEntry.right);
- return getMetaDataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer),
name, type);
+ return getMetadataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer),
name, type);
}
/**
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
index bb58ec4..02582f0 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -115,13 +115,11 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
continue;
}
- Map<String, TimeseriesMetadata> timeseriesMetaDataInDevice = tsFileReader
- .readDeviceMetadata(selectedDevice);
+ List<TimeseriesMetadata> timeseriesMetaDataList = tsFileReader
+ .readTimeseriesMetadata(selectedDevice, selectedMeasurements);
List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
- for (Map.Entry<String, TimeseriesMetadata> entry :
timeseriesMetaDataInDevice.entrySet()) {
- if (selectedMeasurements.contains(entry.getKey())) {
-
chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(entry.getValue()));
- }
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetaDataList) {
+
chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(timeseriesMetadata));
}
// d1
for (ChunkMetadata chunkMetaData : chunkMetadataList) {