This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new 3d593a98 start to work on SingleDeviceRecordReader
3d593a98 is described below
commit 3d593a98331110245c98912e2c83a0841437cc28
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 12:15:57 2024 +0800
start to work on SingleDeviceRecordReader
---
.../tsfile/exception/read/NoColumnException.java | 8 ++
.../ReadProcessException.java} | 17 ++--
.../UnsupportedOrderingException.java} | 11 +--
.../exception/write/NoMeasurementException.java | 4 +-
.../tsfile/read/controller/DeviceMetaIterator.java | 91 ++++++++++++++++++++++
.../tsfile/read/controller/IMetadataQuerier.java | 6 ++
.../read/controller/MetadataQuerierByFileImpl.java | 8 ++
.../tsfile/read/query/executor/QueryExecutor.java | 7 +-
.../tsfile/read/query/executor/TsFileExecutor.java | 63 ++++++++++++---
.../DeviceQueryTask.java} | 28 ++++---
.../query/executor/task/DeviceTaskIterator.java | 33 ++++++++
.../reader/block/DeviceOrderedTsBlockReader.java | 55 +++++++++++++
.../reader/block/SingleDeviceTsBlockReader.java | 36 +++++++++
.../TsBlockReader.java} | 6 +-
14 files changed, 329 insertions(+), 44 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/tsfile/exception/read/NoColumnException.java
b/tsfile/src/main/java/org/apache/tsfile/exception/read/NoColumnException.java
new file mode 100644
index 00000000..e52f0835
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/exception/read/NoColumnException.java
@@ -0,0 +1,8 @@
+package org.apache.tsfile.exception.read;
+
+public class NoColumnException extends ReadProcessException{
+
+ public NoColumnException(String columnName) {
+ super(String.format("No column: %s", columnName));
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
b/tsfile/src/main/java/org/apache/tsfile/exception/read/ReadProcessException.java
similarity index 71%
copy from
tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
copy to
tsfile/src/main/java/org/apache/tsfile/exception/read/ReadProcessException.java
index 5e5745e2..1e9daf17 100644
---
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
+++
b/tsfile/src/main/java/org/apache/tsfile/exception/read/ReadProcessException.java
@@ -17,14 +17,19 @@
* under the License.
*/
-package org.apache.tsfile.exception.write;
+package org.apache.tsfile.exception.read;
-/** This exception means it can not find the measurement while writing a
TSRecord. */
-public class NoMeasurementException extends WriteProcessException {
+public class ReadProcessException extends Exception{
- private static final long serialVersionUID = -5599767368831572747L;
+ public ReadProcessException(String message) {
+ super(message);
+ }
+
+ public ReadProcessException(String message, Throwable cause) {
+ super(message, cause);
+ }
- public NoMeasurementException(String msg) {
- super(msg);
+ public ReadProcessException(Throwable cause) {
+ super(cause);
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
b/tsfile/src/main/java/org/apache/tsfile/exception/read/UnsupportedOrderingException.java
similarity index 71%
copy from
tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
copy to
tsfile/src/main/java/org/apache/tsfile/exception/read/UnsupportedOrderingException.java
index 5e5745e2..6b7120c7 100644
---
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
+++
b/tsfile/src/main/java/org/apache/tsfile/exception/read/UnsupportedOrderingException.java
@@ -17,14 +17,11 @@
* under the License.
*/
-package org.apache.tsfile.exception.write;
+package org.apache.tsfile.exception.read;
-/** This exception means it can not find the measurement while writing a
TSRecord. */
-public class NoMeasurementException extends WriteProcessException {
+public class UnsupportedOrderingException extends ReadProcessException {
- private static final long serialVersionUID = -5599767368831572747L;
-
- public NoMeasurementException(String msg) {
- super(msg);
+ public UnsupportedOrderingException(String ordering) {
+ super(String.format("Unsupported ordering: %s", ordering));
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
index 5e5745e2..b7d3f3a9 100644
---
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
+++
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
@@ -24,7 +24,7 @@ public class NoMeasurementException extends
WriteProcessException {
private static final long serialVersionUID = -5599767368831572747L;
- public NoMeasurementException(String msg) {
- super(msg);
+ public NoMeasurementException(String columnName) {
+ super(String.format("No measurement for %s", columnName));
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
new file mode 100644
index 00000000..1a3a2594
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
@@ -0,0 +1,91 @@
+package org.apache.tsfile.read.controller;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import org.apache.tsfile.file.IMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.DeviceMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MeasurementMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeviceMetaIterator implements Iterator<Pair<IDeviceID,
MetadataIndexNode>> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DeviceMetaIterator.class);
+ private final TsFileSequenceReader tsFileSequenceReader;
+ private final Queue<MetadataIndexNode> metadataIndexNodes = new
ArrayDeque<>();
+ private final Queue<Pair<IDeviceID, MetadataIndexNode>> resultCache = new
ArrayDeque<>();
+
+ public DeviceMetaIterator(TsFileSequenceReader tsFileSequenceReader,
MetadataIndexNode metadataIndexNode) {
+ this.tsFileSequenceReader = tsFileSequenceReader;
+ this.metadataIndexNodes.add(metadataIndexNode);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!resultCache.isEmpty()) {
+ return true;
+ }
+ try {
+ loadResults();
+ } catch (IOException e) {
+ LOGGER.error("Failed to load device meta data", e);
+ return false;
+ }
+
+ return !resultCache.isEmpty();
+ }
+
+ private void loadResults() throws IOException {
+ while (!metadataIndexNodes.isEmpty()) {
+ final MetadataIndexNode currentNode = metadataIndexNodes.poll();
+ final MetadataIndexNodeType nodeType = currentNode.getNodeType();
+ switch (nodeType) {
+ case LEAF_DEVICE:
+ List<IMetadataIndexEntry> leafChildren = currentNode.getChildren();
+ for (int i = 0; i < leafChildren.size(); i++) {
+ IMetadataIndexEntry child = leafChildren.get(i);
+ final IDeviceID deviceID = ((DeviceMetadataIndexEntry)
child).getDeviceID();
+ long startOffset = child.getOffset();
+ long endOffset = i < leafChildren.size() - 1 ? leafChildren.get(i
+ 1).getOffset() :
+ currentNode.getEndOffset();
+ final MetadataIndexNode childNode =
tsFileSequenceReader.readMetadataIndexNode(
+ startOffset, endOffset, false);
+ resultCache.add(new Pair<>(deviceID, childNode));
+ }
+ return;
+ case INTERNAL_DEVICE:
+ List<IMetadataIndexEntry> internalChildren =
currentNode.getChildren();
+ for (int i = 0; i < internalChildren.size(); i++) {
+ IMetadataIndexEntry child = internalChildren.get(i);
+ long startOffset = child.getOffset();
+ long endOffset = i < internalChildren.size() - 1 ?
internalChildren.get(i + 1).getOffset() :
+ currentNode.getEndOffset();
+ final MetadataIndexNode childNode =
tsFileSequenceReader.readMetadataIndexNode(
+ startOffset, endOffset, true);
+ metadataIndexNodes.add(childNode);
+ }
+ break;
+ default:
+ throw new IOException("A non-device node detected: " + currentNode);
+ }
+ }
+ }
+
+ @Override
+ public Pair<IDeviceID, MetadataIndexNode> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return resultCache.poll();
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
index 3aa63a6b..873412c3 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
@@ -19,9 +19,12 @@
package org.apache.tsfile.read.controller;
+import java.util.Iterator;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.NoMeasurementException;
import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TsFileMetadata;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
@@ -29,6 +32,7 @@ import org.apache.tsfile.read.common.TimeRange;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.tsfile.utils.Pair;
public interface IMetadataQuerier {
@@ -65,4 +69,6 @@ public interface IMetadataQuerier {
/** clear caches (if used) to release memory. */
void clear();
+
+ Iterator<Pair<IDeviceID, MetadataIndexNode>>
deviceIterator(MetadataIndexNode root);
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 64d62442..6d25e473 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.read.controller;
+import java.util.Iterator;
import org.apache.tsfile.common.cache.LRUCache;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.AlignedTimeSeriesMetadata;
@@ -26,6 +27,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.TsFileMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -44,6 +46,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.tsfile.utils.Pair;
public class MetadataQuerierByFileImpl implements IMetadataQuerier {
@@ -255,4 +258,9 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
public void clear() {
chunkMetaDataCache.clear();
}
+
+ @Override
+ public Iterator<Pair<IDeviceID, MetadataIndexNode>>
deviceIterator(MetadataIndexNode root) {
+ return new DeviceMetaIterator(tsFileReader, root);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
index f709bd08..9f498afb 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
@@ -20,17 +20,18 @@
package org.apache.tsfile.read.query.executor;
import java.util.List;
+import org.apache.tsfile.exception.read.ReadProcessException;
import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
-import org.apache.tsfile.read.reader.RecordReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
public interface QueryExecutor {
QueryDataSet execute(QueryExpression queryExpression) throws IOException;
- RecordReader query(String tableName, List<String> columns, ExpressionTree
timeFilter,
- ExpressionTree idFilter, ExpressionTree measurementFilter);
+ TsBlockReader query(String tableName, List<String> columns, ExpressionTree
timeFilter,
+ ExpressionTree idFilter, ExpressionTree measurementFilter) throws
ReadProcessException;
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
index 4559c539..0a9f40b9 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
@@ -23,6 +23,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.tsfile.exception.read.NoColumnException;
+import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.exception.read.UnsupportedOrderingException;
import org.apache.tsfile.exception.write.NoMeasurementException;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
@@ -40,8 +43,10 @@ import
org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.tsfile.read.reader.RecordReader;
-import org.apache.tsfile.read.reader.RecordReader.EmptyRecordReader;
+import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+import org.apache.tsfile.read.reader.block.DeviceOrderedTsBlockReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader.EmptyTsBlockReader;
import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.tsfile.read.reader.series.EmptyFileSeriesReader;
import org.apache.tsfile.read.reader.series.FileSeriesReader;
@@ -51,11 +56,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
public class TsFileExecutor implements QueryExecutor {
private IMetadataQuerier metadataQuerier;
private IChunkLoader chunkLoader;
+ private TableQueryOrdering tableQueryOrdering = TableQueryOrdering.DEVICE;
public TsFileExecutor(IMetadataQuerier metadataQuerier, IChunkLoader
chunkLoader) {
this.metadataQuerier = metadataQuerier;
@@ -63,17 +70,31 @@ public class TsFileExecutor implements QueryExecutor {
}
@Override
- public RecordReader query(String tableName, List<String> columns,
ExpressionTree timeFilter,
- ExpressionTree idFilter, ExpressionTree measurementFilter) {
+ public TsBlockReader query(String tableName, List<String> columns,
ExpressionTree timeFilter,
+ ExpressionTree idFilter, ExpressionTree measurementFilter) throws
ReadProcessException {
TsFileMetadata fileMetadata = metadataQuerier.getWholeFileMetadata();
MetadataIndexNode tableRoot = fileMetadata.getTableMetadataIndexNodeMap()
.get(tableName);
TableSchema tableSchema = fileMetadata.getTableSchemaMap().get(tableName);
if (tableRoot == null || tableSchema == null) {
- return new EmptyRecordReader();
+ return new EmptyTsBlockReader();
}
- return null;
+ ColumnMapping columnMapping = new ColumnMapping();
+ for (int i = 0; i < columns.size(); i++) {
+ String column = columns.get(i);
+ columnMapping.add(column, i, tableSchema);
+ }
+
+ DeviceTaskIterator deviceTaskIterator = new DeviceTaskIterator(columns,
tableRoot,
+ columnMapping, metadataQuerier);
+ switch (tableQueryOrdering) {
+ case DEVICE:
+ return new DeviceOrderedTsBlockReader(deviceTaskIterator,
metadataQuerier, chunkLoader);
+ case TIME:
+ default:
+ throw new UnsupportedOrderingException(tableQueryOrdering.toString());
+ }
}
@Override
@@ -218,16 +239,36 @@ public class TsFileExecutor implements QueryExecutor {
return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes,
readersOfSelectedSeries);
}
- private class ColumnMapping {
+ public class ColumnMapping {
/**
* The same column may occur multiple times in a query, but we surely do
not want to read it redundantly.
* This mapping is used to put data of the same series into multiple
columns.
*/
- private Map<String, List<Integer>> columnPosMapping = new HashMap<>();
- private Map<String, Boolean> isId = new HashMap<>();
+ private Map<String, List<Integer>> columnPosMap = new HashMap<>();
+ private Map<String, Boolean> isIdMap = new HashMap<>();
- private void add(String columnName, int i, TableSchema schema) throws
NoMeasurementException {
- schema.getColumnSchemas()
+ public void add(String columnName, int i, TableSchema schema) throws
NoColumnException {
+ final int columnIndex = schema.findColumnIndex(columnName);
+ if (columnIndex < 0) {
+ throw new NoColumnException(columnName);
+ }
+
+ final ColumnType columnType = schema.getColumnTypes().get(columnIndex);
+ columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
+ isIdMap.put(columnName, columnType.equals(ColumnType.ID));
}
+
+ public List<Integer> getColumnPos(String columnName) {
+ return columnPosMap.getOrDefault(columnName, Collections.emptyList());
+ }
+
+ public boolean isId(String columnName) {
+ return isIdMap.getOrDefault(columnName, false);
+ }
+ }
+
+ public enum TableQueryOrdering {
+ TIME,
+ DEVICE
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
similarity index 54%
copy from
tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
copy to
tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
index f709bd08..46e9d24c 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
@@ -17,20 +17,24 @@
* under the License.
*/
-package org.apache.tsfile.read.query.executor;
+package org.apache.tsfile.read.query.executor.task;
import java.util.List;
-import org.apache.tsfile.read.expression.ExpressionTree;
-import org.apache.tsfile.read.expression.QueryExpression;
-import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
-import java.io.IOException;
-import org.apache.tsfile.read.reader.RecordReader;
+public class DeviceQueryTask {
+ private IDeviceID deviceID;
+ private List<String> columnNames;
+ private ColumnMapping columnMapping;
+ private MetadataIndexNode indexRoot;
-public interface QueryExecutor {
-
- QueryDataSet execute(QueryExpression queryExpression) throws IOException;
-
- RecordReader query(String tableName, List<String> columns, ExpressionTree
timeFilter,
- ExpressionTree idFilter, ExpressionTree measurementFilter);
+ public DeviceQueryTask(IDeviceID deviceID, List<String> columnNames,
ColumnMapping columnMapping,
+ MetadataIndexNode indexRoot) {
+ this.deviceID = deviceID;
+ this.columnNames = columnNames;
+ this.columnMapping = columnMapping;
+ this.indexRoot = indexRoot;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
new file mode 100644
index 00000000..cdcb78d0
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
@@ -0,0 +1,33 @@
+package org.apache.tsfile.read.query.executor.task;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
+import org.apache.tsfile.utils.Pair;
+
+public class DeviceTaskIterator implements Iterator<DeviceQueryTask> {
+ private List<String> columnNames;
+ private ColumnMapping columnMapping;
+ private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
+
+ public DeviceTaskIterator(List<String> columnNames, MetadataIndexNode
indexRoot,
+ ColumnMapping columnMapping, IMetadataQuerier metadataQuerier) {
+ this.columnNames = columnNames;
+ this.columnMapping = columnMapping;
+ this.deviceMetaIterator = metadataQuerier.deviceIterator(indexRoot);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return deviceMetaIterator.hasNext();
+ }
+
+ @Override
+ public DeviceQueryTask next() {
+ final Pair<IDeviceID, MetadataIndexNode> next = deviceMetaIterator.next();
+ return new DeviceQueryTask(next.left, columnNames, columnMapping,
next.right);
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
new file mode 100644
index 00000000..a86a628b
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
@@ -0,0 +1,55 @@
+package org.apache.tsfile.read.reader.block;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.controller.IChunkLoader;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
+import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+
+public class DeviceOrderedTsBlockReader implements TsBlockReader {
+
+ private final DeviceTaskIterator taskIterator;
+ private final IMetadataQuerier metadataQuerier;
+ private final IChunkLoader chunkLoader;
+ private SingleDeviceTsBlockReader currentReader;
+
+ public DeviceOrderedTsBlockReader(DeviceTaskIterator taskIterator,
+ IMetadataQuerier metadataQuerier,
+ IChunkLoader chunkLoader) {
+ this.taskIterator = taskIterator;
+ this.metadataQuerier = metadataQuerier;
+ this.chunkLoader = chunkLoader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentReader != null && currentReader.hasNext()) {
+ return true;
+ }
+ while (taskIterator.hasNext()) {
+ final DeviceQueryTask nextTask = taskIterator.next();
+ currentReader = new SingleDeviceTsBlockReader(nextTask, metadataQuerier,
chunkLoader);
+ if (currentReader.hasNext()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public TsBlock next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return currentReader.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (currentReader != null) {
+ currentReader.close();
+ }
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
new file mode 100644
index 00000000..59daad83
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
@@ -0,0 +1,36 @@
+package org.apache.tsfile.read.reader.block;
+
+import java.io.IOException;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.controller.IChunkLoader;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
+
+public class SingleDeviceTsBlockReader implements TsBlockReader {
+
+ private DeviceQueryTask task;
+ private IMetadataQuerier metadataQuerier;
+ private IChunkLoader chunkLoader;
+
+ public SingleDeviceTsBlockReader(DeviceQueryTask task, IMetadataQuerier
metadataQuerier,
+ IChunkLoader chunkLoader) {
+ this.task = task;
+ this.metadataQuerier = metadataQuerier;
+ this.chunkLoader = chunkLoader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public TsBlock next() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // nothing to be done
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/RecordReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/TsBlockReader.java
similarity index 88%
rename from tsfile/src/main/java/org/apache/tsfile/read/reader/RecordReader.java
rename to
tsfile/src/main/java/org/apache/tsfile/read/reader/block/TsBlockReader.java
index 782c9809..f542ac7d 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/reader/RecordReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/TsBlockReader.java
@@ -17,16 +17,16 @@
* under the License.
*/
-package org.apache.tsfile.read.reader;
+package org.apache.tsfile.read.reader.block;
import java.io.IOException;
import org.apache.tsfile.read.common.block.TsBlock;
-public interface RecordReader extends AutoCloseable {
+public interface TsBlockReader extends AutoCloseable {
boolean hasNext();
TsBlock next() throws IOException;
- class EmptyRecordReader implements RecordReader{
+ class EmptyTsBlockReader implements TsBlockReader {
@Override
public boolean hasNext() {