This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new 53defb89 apply id filter
53defb89 is described below
commit 53defb898c7e75b6e646f16b633fb125570389b2
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 14:26:31 2024 +0800
apply id filter
---
.../tsfile/read/controller/DeviceMetaIterator.java | 64 +++++++++------
.../tsfile/read/controller/IMetadataQuerier.java | 4 +-
.../read/controller/MetadataQuerierByFileImpl.java | 6 +-
.../tsfile/read/query/executor/QueryExecutor.java | 3 -
.../read/query/executor/TableQueryExecutor.java | 95 ++++++++++++++++++++++
.../tsfile/read/query/executor/TsFileExecutor.java | 76 -----------------
.../read/query/executor/task/DeviceQueryTask.java | 16 ++++
.../query/executor/task/DeviceTaskIterator.java | 7 +-
.../reader/block/SingleDeviceTsBlockReader.java | 2 +
9 files changed, 164 insertions(+), 109 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
index 1a3a2594..95e9834a 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
@@ -9,11 +9,10 @@ import java.util.Queue;
import org.apache.tsfile.file.IMetadataIndexEntry;
import org.apache.tsfile.file.metadata.DeviceMetadataIndexEntry;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.MeasurementMetadataIndexEntry;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,10 +23,13 @@ public class DeviceMetaIterator implements
Iterator<Pair<IDeviceID, MetadataInde
private final TsFileSequenceReader tsFileSequenceReader;
private final Queue<MetadataIndexNode> metadataIndexNodes = new
ArrayDeque<>();
private final Queue<Pair<IDeviceID, MetadataIndexNode>> resultCache = new
ArrayDeque<>();
+ private final ExpressionTree idFilter;
- public DeviceMetaIterator(TsFileSequenceReader tsFileSequenceReader,
MetadataIndexNode metadataIndexNode) {
+ public DeviceMetaIterator(TsFileSequenceReader tsFileSequenceReader,
MetadataIndexNode metadataIndexNode,
+ ExpressionTree idFilter) {
this.tsFileSequenceReader = tsFileSequenceReader;
this.metadataIndexNodes.add(metadataIndexNode);
+ this.idFilter = idFilter;
}
@Override
@@ -45,35 +47,49 @@ public class DeviceMetaIterator implements
Iterator<Pair<IDeviceID, MetadataInde
return !resultCache.isEmpty();
}
+ private void loadLeafDevice(MetadataIndexNode currentNode) throws
IOException {
+ List<IMetadataIndexEntry> leafChildren = currentNode.getChildren();
+ for (int i = 0; i < leafChildren.size(); i++) {
+ IMetadataIndexEntry child = leafChildren.get(i);
+ final IDeviceID deviceID = ((DeviceMetadataIndexEntry)
child).getDeviceID();
+ if (idFilter != null && !idFilter.satisfy(deviceID)) {
+ continue;
+ }
+
+ long startOffset = child.getOffset();
+ long endOffset = i < leafChildren.size() - 1 ? leafChildren.get(i +
1).getOffset() :
+ currentNode.getEndOffset();
+ final MetadataIndexNode childNode =
tsFileSequenceReader.readMetadataIndexNode(
+ startOffset, endOffset, false);
+ resultCache.add(new Pair<>(deviceID, childNode));
+ }
+ }
+
+ private void loadInternalNode(MetadataIndexNode currentNode) throws
IOException {
+ List<IMetadataIndexEntry> internalChildren = currentNode.getChildren();
+ for (int i = 0; i < internalChildren.size(); i++) {
+ IMetadataIndexEntry child = internalChildren.get(i);
+ long startOffset = child.getOffset();
+ long endOffset = i < internalChildren.size() - 1 ?
internalChildren.get(i + 1).getOffset() :
+ currentNode.getEndOffset();
+ final MetadataIndexNode childNode =
tsFileSequenceReader.readMetadataIndexNode(
+ startOffset, endOffset, true);
+ metadataIndexNodes.add(childNode);
+ }
+ }
+
private void loadResults() throws IOException {
while (!metadataIndexNodes.isEmpty()) {
final MetadataIndexNode currentNode = metadataIndexNodes.poll();
final MetadataIndexNodeType nodeType = currentNode.getNodeType();
switch (nodeType) {
case LEAF_DEVICE:
- List<IMetadataIndexEntry> leafChildren = currentNode.getChildren();
- for (int i = 0; i < leafChildren.size(); i++) {
- IMetadataIndexEntry child = leafChildren.get(i);
- final IDeviceID deviceID = ((DeviceMetadataIndexEntry)
child).getDeviceID();
- long startOffset = child.getOffset();
- long endOffset = i < leafChildren.size() - 1 ? leafChildren.get(i
+ 1).getOffset() :
- currentNode.getEndOffset();
- final MetadataIndexNode childNode =
tsFileSequenceReader.readMetadataIndexNode(
- startOffset, endOffset, false);
- resultCache.add(new Pair<>(deviceID, childNode));
+ loadLeafDevice(currentNode);
+ if (!resultCache.isEmpty()) {
+ return;
}
- return;
case INTERNAL_DEVICE:
- List<IMetadataIndexEntry> internalChildren =
currentNode.getChildren();
- for (int i = 0; i < internalChildren.size(); i++) {
- IMetadataIndexEntry child = internalChildren.get(i);
- long startOffset = child.getOffset();
- long endOffset = i < internalChildren.size() - 1 ?
internalChildren.get(i + 1).getOffset() :
- currentNode.getEndOffset();
- final MetadataIndexNode childNode =
tsFileSequenceReader.readMetadataIndexNode(
- startOffset, endOffset, true);
- metadataIndexNodes.add(childNode);
- }
+ loadInternalNode(currentNode);
break;
default:
throw new IOException("A non-device node detected: " + currentNode);
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
index 873412c3..09741943 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
@@ -32,6 +32,7 @@ import org.apache.tsfile.read.common.TimeRange;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.utils.Pair;
public interface IMetadataQuerier {
@@ -70,5 +71,6 @@ public interface IMetadataQuerier {
/** clear caches (if used) to release memory. */
void clear();
- Iterator<Pair<IDeviceID, MetadataIndexNode>>
deviceIterator(MetadataIndexNode root);
+ Iterator<Pair<IDeviceID, MetadataIndexNode>>
deviceIterator(MetadataIndexNode root,
+ ExpressionTree idFilter);
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 6d25e473..d7fa458f 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -46,6 +46,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.utils.Pair;
public class MetadataQuerierByFileImpl implements IMetadataQuerier {
@@ -260,7 +261,8 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
}
@Override
- public Iterator<Pair<IDeviceID, MetadataIndexNode>>
deviceIterator(MetadataIndexNode root) {
- return new DeviceMetaIterator(tsFileReader, root);
+ public Iterator<Pair<IDeviceID, MetadataIndexNode>>
deviceIterator(MetadataIndexNode root,
+ ExpressionTree idFilter) {
+ return new DeviceMetaIterator(tsFileReader, root, idFilter);
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
index 9f498afb..ae40c2d9 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
@@ -31,7 +31,4 @@ import org.apache.tsfile.read.reader.block.TsBlockReader;
public interface QueryExecutor {
QueryDataSet execute(QueryExpression queryExpression) throws IOException;
-
- TsBlockReader query(String tableName, List<String> columns, ExpressionTree
timeFilter,
- ExpressionTree idFilter, ExpressionTree measurementFilter) throws
ReadProcessException;
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
new file mode 100644
index 00000000..112acf66
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
@@ -0,0 +1,95 @@
+package org.apache.tsfile.read.query.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.tsfile.exception.read.NoColumnException;
+import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.exception.read.UnsupportedOrderingException;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.read.controller.IChunkLoader;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.expression.ExpressionTree;
+import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+import org.apache.tsfile.read.reader.block.DeviceOrderedTsBlockReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader.EmptyTsBlockReader;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
+
+public class TableQueryExecutor {
+
+ private IMetadataQuerier metadataQuerier;
+ private IChunkLoader chunkLoader;
+ private TableQueryOrdering tableQueryOrdering;
+
+ public TableQueryExecutor(IMetadataQuerier metadataQuerier, IChunkLoader
chunkLoader,
+ TableQueryOrdering tableQueryOrdering) {
+ this.metadataQuerier = metadataQuerier;
+ this.chunkLoader = chunkLoader;
+ this.tableQueryOrdering = tableQueryOrdering;
+ }
+
+ public TsBlockReader query(String tableName, List<String> columns,
ExpressionTree timeFilter,
+ ExpressionTree idFilter, ExpressionTree measurementFilter) throws
ReadProcessException {
+ TsFileMetadata fileMetadata = metadataQuerier.getWholeFileMetadata();
+ MetadataIndexNode tableRoot = fileMetadata.getTableMetadataIndexNodeMap()
+ .get(tableName);
+ TableSchema tableSchema = fileMetadata.getTableSchemaMap().get(tableName);
+ if (tableRoot == null || tableSchema == null) {
+ return new EmptyTsBlockReader();
+ }
+
+ ColumnMapping columnMapping = new ColumnMapping();
+ for (int i = 0; i < columns.size(); i++) {
+ String column = columns.get(i);
+ columnMapping.add(column, i, tableSchema);
+ }
+
+ DeviceTaskIterator deviceTaskIterator = new DeviceTaskIterator(columns,
tableRoot,
+ columnMapping, metadataQuerier, idFilter);
+ switch (tableQueryOrdering) {
+ case DEVICE:
+ return new DeviceOrderedTsBlockReader(deviceTaskIterator,
metadataQuerier, chunkLoader);
+ case TIME:
+ default:
+ throw new UnsupportedOrderingException(tableQueryOrdering.toString());
+ }
+ }
+
+ public class ColumnMapping {
+ /**
+ * The same column may occur multiple times in a query, but we surely do
not want to read it redundantly.
+ * This mapping is used to put data of the same series into multiple
columns.
+ */
+ private Map<String, List<Integer>> columnPosMap = new HashMap<>();
+ private Map<String, Boolean> isIdMap = new HashMap<>();
+
+ public void add(String columnName, int i, TableSchema schema) throws
NoColumnException {
+ final int columnIndex = schema.findColumnIndex(columnName);
+ if (columnIndex < 0) {
+ throw new NoColumnException(columnName);
+ }
+
+ final ColumnType columnType = schema.getColumnTypes().get(columnIndex);
+ columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
+ isIdMap.put(columnName, columnType.equals(ColumnType.ID));
+ }
+
+ public List<Integer> getColumnPos(String columnName) {
+ return columnPosMap.getOrDefault(columnName, Collections.emptyList());
+ }
+
+ public boolean isId(String columnName) {
+ return isIdMap.getOrDefault(columnName, false);
+ }
+ }
+
+ public enum TableQueryOrdering {
+ TIME,
+ DEVICE
+ }
+}
\ No newline at end of file
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
index 0a9f40b9..1ba683ac 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
@@ -19,23 +19,14 @@
package org.apache.tsfile.read.query.executor;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.tsfile.exception.read.NoColumnException;
-import org.apache.tsfile.exception.read.ReadProcessException;
-import org.apache.tsfile.exception.read.UnsupportedOrderingException;
import org.apache.tsfile.exception.write.NoMeasurementException;
import org.apache.tsfile.file.metadata.IChunkMetadata;
-import org.apache.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.tsfile.file.metadata.TableSchema;
-import org.apache.tsfile.file.metadata.TsFileMetadata;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.controller.IChunkLoader;
import org.apache.tsfile.read.controller.IMetadataQuerier;
-import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.read.expression.IExpression;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.expression.impl.BinaryExpression;
@@ -43,10 +34,6 @@ import
org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
-import org.apache.tsfile.read.reader.block.DeviceOrderedTsBlockReader;
-import org.apache.tsfile.read.reader.block.TsBlockReader;
-import org.apache.tsfile.read.reader.block.TsBlockReader.EmptyTsBlockReader;
import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.tsfile.read.reader.series.EmptyFileSeriesReader;
import org.apache.tsfile.read.reader.series.FileSeriesReader;
@@ -56,47 +43,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.tsfile.write.record.Tablet.ColumnType;
public class TsFileExecutor implements QueryExecutor {
private IMetadataQuerier metadataQuerier;
private IChunkLoader chunkLoader;
- private TableQueryOrdering tableQueryOrdering = TableQueryOrdering.DEVICE;
public TsFileExecutor(IMetadataQuerier metadataQuerier, IChunkLoader
chunkLoader) {
this.metadataQuerier = metadataQuerier;
this.chunkLoader = chunkLoader;
}
- @Override
- public TsBlockReader query(String tableName, List<String> columns,
ExpressionTree timeFilter,
- ExpressionTree idFilter, ExpressionTree measurementFilter) throws
ReadProcessException {
- TsFileMetadata fileMetadata = metadataQuerier.getWholeFileMetadata();
- MetadataIndexNode tableRoot = fileMetadata.getTableMetadataIndexNodeMap()
- .get(tableName);
- TableSchema tableSchema = fileMetadata.getTableSchemaMap().get(tableName);
- if (tableRoot == null || tableSchema == null) {
- return new EmptyTsBlockReader();
- }
-
- ColumnMapping columnMapping = new ColumnMapping();
- for (int i = 0; i < columns.size(); i++) {
- String column = columns.get(i);
- columnMapping.add(column, i, tableSchema);
- }
-
- DeviceTaskIterator deviceTaskIterator = new DeviceTaskIterator(columns,
tableRoot,
- columnMapping, metadataQuerier);
- switch (tableQueryOrdering) {
- case DEVICE:
- return new DeviceOrderedTsBlockReader(deviceTaskIterator,
metadataQuerier, chunkLoader);
- case TIME:
- default:
- throw new UnsupportedOrderingException(tableQueryOrdering.toString());
- }
- }
-
@Override
public QueryDataSet execute(QueryExpression queryExpression) throws
IOException {
// bloom filter
@@ -238,37 +195,4 @@ public class TsFileExecutor implements QueryExecutor {
}
return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes,
readersOfSelectedSeries);
}
-
- public class ColumnMapping {
- /**
- * The same column may occur multiple times in a query, but we surely do
not want to read it redundantly.
- * This mapping is used to put data of the same series into multiple
columns.
- */
- private Map<String, List<Integer>> columnPosMap = new HashMap<>();
- private Map<String, Boolean> isIdMap = new HashMap<>();
-
- public void add(String columnName, int i, TableSchema schema) throws
NoColumnException {
- final int columnIndex = schema.findColumnIndex(columnName);
- if (columnIndex < 0) {
- throw new NoColumnException(columnName);
- }
-
- final ColumnType columnType = schema.getColumnTypes().get(columnIndex);
- columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
- isIdMap.put(columnName, columnType.equals(ColumnType.ID));
- }
-
- public List<Integer> getColumnPos(String columnName) {
- return columnPosMap.getOrDefault(columnName, Collections.emptyList());
- }
-
- public boolean isId(String columnName) {
- return isIdMap.getOrDefault(columnName, false);
- }
- }
-
- public enum TableQueryOrdering {
- TIME,
- DEVICE
- }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
index 46e9d24c..3744fe9d 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
@@ -37,4 +37,20 @@ public class DeviceQueryTask {
this.columnMapping = columnMapping;
this.indexRoot = indexRoot;
}
+
+ public IDeviceID getDeviceID() {
+ return deviceID;
+ }
+
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
+ public ColumnMapping getColumnMapping() {
+ return columnMapping;
+ }
+
+ public MetadataIndexNode getIndexRoot() {
+ return indexRoot;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
index cdcb78d0..9e387ff8 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
@@ -5,7 +5,8 @@ import java.util.List;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.read.controller.IMetadataQuerier;
-import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
+import org.apache.tsfile.read.expression.ExpressionTree;
+import org.apache.tsfile.read.query.executor.TableQueryExecutor.ColumnMapping;
import org.apache.tsfile.utils.Pair;
public class DeviceTaskIterator implements Iterator<DeviceQueryTask> {
@@ -14,10 +15,10 @@ public class DeviceTaskIterator implements
Iterator<DeviceQueryTask> {
private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
public DeviceTaskIterator(List<String> columnNames, MetadataIndexNode
indexRoot,
- ColumnMapping columnMapping, IMetadataQuerier metadataQuerier) {
+ ColumnMapping columnMapping, IMetadataQuerier metadataQuerier,
ExpressionTree idFilter) {
this.columnNames = columnNames;
this.columnMapping = columnMapping;
- this.deviceMetaIterator = metadataQuerier.deviceIterator(indexRoot);
+ this.deviceMetaIterator = metadataQuerier.deviceIterator(indexRoot,
idFilter);
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
index 59daad83..f61ec47f 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
@@ -17,6 +17,8 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
this.task = task;
this.metadataQuerier = metadataQuerier;
this.chunkLoader = chunkLoader;
+
+
}
@Override