This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/tsFile_v4 by this push:
     new 3d593a98 start to work on SingleDeviceRecordReader
3d593a98 is described below

commit 3d593a98331110245c98912e2c83a0841437cc28
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 12:15:57 2024 +0800

    start to work on SingleDeviceRecordReader
---
 .../tsfile/exception/read/NoColumnException.java   |  8 ++
 .../ReadProcessException.java}                     | 17 ++--
 .../UnsupportedOrderingException.java}             | 11 +--
 .../exception/write/NoMeasurementException.java    |  4 +-
 .../tsfile/read/controller/DeviceMetaIterator.java | 91 ++++++++++++++++++++++
 .../tsfile/read/controller/IMetadataQuerier.java   |  6 ++
 .../read/controller/MetadataQuerierByFileImpl.java |  8 ++
 .../tsfile/read/query/executor/QueryExecutor.java  |  7 +-
 .../tsfile/read/query/executor/TsFileExecutor.java | 63 ++++++++++++---
 .../DeviceQueryTask.java}                          | 28 ++++---
 .../query/executor/task/DeviceTaskIterator.java    | 33 ++++++++
 .../reader/block/DeviceOrderedTsBlockReader.java   | 55 +++++++++++++
 .../reader/block/SingleDeviceTsBlockReader.java    | 36 +++++++++
 .../TsBlockReader.java}                            |  6 +-
 14 files changed, 329 insertions(+), 44 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/tsfile/exception/read/NoColumnException.java 
b/tsfile/src/main/java/org/apache/tsfile/exception/read/NoColumnException.java
new file mode 100644
index 00000000..e52f0835
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/exception/read/NoColumnException.java
@@ -0,0 +1,8 @@
+package org.apache.tsfile.exception.read;
+
+public class NoColumnException extends ReadProcessException{
+
+  public NoColumnException(String columnName) {
+    super(String.format("No column: %s", columnName));
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
 
b/tsfile/src/main/java/org/apache/tsfile/exception/read/ReadProcessException.java
similarity index 71%
copy from 
tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
copy to 
tsfile/src/main/java/org/apache/tsfile/exception/read/ReadProcessException.java
index 5e5745e2..1e9daf17 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/exception/read/ReadProcessException.java
@@ -17,14 +17,19 @@
  * under the License.
  */
 
-package org.apache.tsfile.exception.write;
+package org.apache.tsfile.exception.read;
 
-/** This exception means it can not find the measurement while writing a 
TSRecord. */
-public class NoMeasurementException extends WriteProcessException {
+public class ReadProcessException extends Exception{
 
-  private static final long serialVersionUID = -5599767368831572747L;
+  public ReadProcessException(String message) {
+    super(message);
+  }
+
+  public ReadProcessException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
-  public NoMeasurementException(String msg) {
-    super(msg);
+  public ReadProcessException(Throwable cause) {
+    super(cause);
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
 
b/tsfile/src/main/java/org/apache/tsfile/exception/read/UnsupportedOrderingException.java
similarity index 71%
copy from 
tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
copy to 
tsfile/src/main/java/org/apache/tsfile/exception/read/UnsupportedOrderingException.java
index 5e5745e2..6b7120c7 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/exception/read/UnsupportedOrderingException.java
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-package org.apache.tsfile.exception.write;
+package org.apache.tsfile.exception.read;
 
-/** This exception means it can not find the measurement while writing a 
TSRecord. */
-public class NoMeasurementException extends WriteProcessException {
+public class UnsupportedOrderingException extends ReadProcessException {
 
-  private static final long serialVersionUID = -5599767368831572747L;
-
-  public NoMeasurementException(String msg) {
-    super(msg);
+  public UnsupportedOrderingException(String ordering) {
+    super(String.format("Unsupported ordering: %s", ordering));
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
 
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
index 5e5745e2..b7d3f3a9 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoMeasurementException.java
@@ -24,7 +24,7 @@ public class NoMeasurementException extends 
WriteProcessException {
 
   private static final long serialVersionUID = -5599767368831572747L;
 
-  public NoMeasurementException(String msg) {
-    super(msg);
+  public NoMeasurementException(String columnName) {
+    super(String.format("No measurement for %s", columnName));
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
new file mode 100644
index 00000000..1a3a2594
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/DeviceMetaIterator.java
@@ -0,0 +1,91 @@
+package org.apache.tsfile.read.controller;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import org.apache.tsfile.file.IMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.DeviceMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MeasurementMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeviceMetaIterator implements Iterator<Pair<IDeviceID, 
MetadataIndexNode>> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DeviceMetaIterator.class);
+  private final TsFileSequenceReader tsFileSequenceReader;
+  private final Queue<MetadataIndexNode> metadataIndexNodes = new 
ArrayDeque<>();
+  private final Queue<Pair<IDeviceID, MetadataIndexNode>> resultCache = new 
ArrayDeque<>();
+
+  public DeviceMetaIterator(TsFileSequenceReader tsFileSequenceReader, 
MetadataIndexNode metadataIndexNode) {
+    this.tsFileSequenceReader = tsFileSequenceReader;
+    this.metadataIndexNodes.add(metadataIndexNode);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!resultCache.isEmpty()) {
+      return true;
+    }
+    try {
+      loadResults();
+    } catch (IOException e) {
+      LOGGER.error("Failed to load device meta data", e);
+      return false;
+    }
+
+    return !resultCache.isEmpty();
+  }
+
+  private void loadResults() throws IOException {
+    while (!metadataIndexNodes.isEmpty()) {
+      final MetadataIndexNode currentNode = metadataIndexNodes.poll();
+      final MetadataIndexNodeType nodeType = currentNode.getNodeType();
+      switch (nodeType) {
+        case LEAF_DEVICE:
+          List<IMetadataIndexEntry> leafChildren = currentNode.getChildren();
+          for (int i = 0; i < leafChildren.size(); i++) {
+            IMetadataIndexEntry child = leafChildren.get(i);
+            final IDeviceID deviceID = ((DeviceMetadataIndexEntry) 
child).getDeviceID();
+            long startOffset = child.getOffset();
+            long endOffset = i < leafChildren.size() - 1 ? leafChildren.get(i 
+ 1).getOffset() :
+                currentNode.getEndOffset();
+            final MetadataIndexNode childNode = 
tsFileSequenceReader.readMetadataIndexNode(
+                startOffset, endOffset, false);
+            resultCache.add(new Pair<>(deviceID, childNode));
+          }
+          return;
+        case INTERNAL_DEVICE:
+          List<IMetadataIndexEntry> internalChildren = 
currentNode.getChildren();
+          for (int i = 0; i < internalChildren.size(); i++) {
+            IMetadataIndexEntry child = internalChildren.get(i);
+            long startOffset = child.getOffset();
+            long endOffset = i < internalChildren.size() - 1 ? 
internalChildren.get(i + 1).getOffset() :
+                currentNode.getEndOffset();
+            final MetadataIndexNode childNode = 
tsFileSequenceReader.readMetadataIndexNode(
+                startOffset, endOffset, true);
+            metadataIndexNodes.add(childNode);
+          }
+          break;
+        default:
+          throw new IOException("A non-device node detected: " + currentNode);
+      }
+    }
+  }
+
+  @Override
+  public Pair<IDeviceID, MetadataIndexNode> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return resultCache.poll();
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
index 3aa63a6b..873412c3 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
@@ -19,9 +19,12 @@
 
 package org.apache.tsfile.read.controller;
 
+import java.util.Iterator;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.NoMeasurementException;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.TsFileMetadata;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.read.common.TimeRange;
@@ -29,6 +32,7 @@ import org.apache.tsfile.read.common.TimeRange;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.tsfile.utils.Pair;
 
 public interface IMetadataQuerier {
 
@@ -65,4 +69,6 @@ public interface IMetadataQuerier {
 
   /** clear caches (if used) to release memory. */
   void clear();
+
+  Iterator<Pair<IDeviceID, MetadataIndexNode>> 
deviceIterator(MetadataIndexNode root);
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 64d62442..6d25e473 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.tsfile.read.controller;
 
+import java.util.Iterator;
 import org.apache.tsfile.common.cache.LRUCache;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.AlignedTimeSeriesMetadata;
@@ -26,6 +27,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.TsFileMetadata;
 import org.apache.tsfile.read.TsFileSequenceReader;
@@ -44,6 +46,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.tsfile.utils.Pair;
 
 public class MetadataQuerierByFileImpl implements IMetadataQuerier {
 
@@ -255,4 +258,9 @@ public class MetadataQuerierByFileImpl implements 
IMetadataQuerier {
   public void clear() {
     chunkMetaDataCache.clear();
   }
+
+  @Override
+  public Iterator<Pair<IDeviceID, MetadataIndexNode>> 
deviceIterator(MetadataIndexNode root) {
+    return new DeviceMetaIterator(tsFileReader, root);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
index f709bd08..9f498afb 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
@@ -20,17 +20,18 @@
 package org.apache.tsfile.read.query.executor;
 
 import java.util.List;
+import org.apache.tsfile.exception.read.ReadProcessException;
 import org.apache.tsfile.read.expression.ExpressionTree;
 import org.apache.tsfile.read.expression.QueryExpression;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
-import org.apache.tsfile.read.reader.RecordReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
 
 public interface QueryExecutor {
 
   QueryDataSet execute(QueryExpression queryExpression) throws IOException;
 
-  RecordReader query(String tableName, List<String> columns, ExpressionTree 
timeFilter,
-      ExpressionTree idFilter, ExpressionTree measurementFilter);
+  TsBlockReader query(String tableName, List<String> columns, ExpressionTree 
timeFilter,
+      ExpressionTree idFilter, ExpressionTree measurementFilter) throws 
ReadProcessException;
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
index 4559c539..0a9f40b9 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TsFileExecutor.java
@@ -23,6 +23,9 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.tsfile.exception.read.NoColumnException;
+import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.exception.read.UnsupportedOrderingException;
 import org.apache.tsfile.exception.write.NoMeasurementException;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
@@ -40,8 +43,10 @@ import 
org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.tsfile.read.expression.util.ExpressionOptimizer;
 import org.apache.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.tsfile.read.reader.RecordReader;
-import org.apache.tsfile.read.reader.RecordReader.EmptyRecordReader;
+import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+import org.apache.tsfile.read.reader.block.DeviceOrderedTsBlockReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
+import org.apache.tsfile.read.reader.block.TsBlockReader.EmptyTsBlockReader;
 import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
 import org.apache.tsfile.read.reader.series.EmptyFileSeriesReader;
 import org.apache.tsfile.read.reader.series.FileSeriesReader;
@@ -51,11 +56,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
 
 public class TsFileExecutor implements QueryExecutor {
 
   private IMetadataQuerier metadataQuerier;
   private IChunkLoader chunkLoader;
+  private TableQueryOrdering tableQueryOrdering = TableQueryOrdering.DEVICE;
 
   public TsFileExecutor(IMetadataQuerier metadataQuerier, IChunkLoader 
chunkLoader) {
     this.metadataQuerier = metadataQuerier;
@@ -63,17 +70,31 @@ public class TsFileExecutor implements QueryExecutor {
   }
 
   @Override
-  public RecordReader query(String tableName, List<String> columns, 
ExpressionTree timeFilter,
-      ExpressionTree idFilter, ExpressionTree measurementFilter) {
+  public TsBlockReader query(String tableName, List<String> columns, 
ExpressionTree timeFilter,
+      ExpressionTree idFilter, ExpressionTree measurementFilter) throws 
ReadProcessException {
     TsFileMetadata fileMetadata = metadataQuerier.getWholeFileMetadata();
     MetadataIndexNode tableRoot = fileMetadata.getTableMetadataIndexNodeMap()
         .get(tableName);
     TableSchema tableSchema = fileMetadata.getTableSchemaMap().get(tableName);
     if (tableRoot == null || tableSchema == null) {
-      return new EmptyRecordReader();
+      return new EmptyTsBlockReader();
     }
 
-    return null;
+    ColumnMapping columnMapping = new ColumnMapping();
+    for (int i = 0; i < columns.size(); i++) {
+      String column = columns.get(i);
+      columnMapping.add(column, i, tableSchema);
+    }
+
+    DeviceTaskIterator deviceTaskIterator = new DeviceTaskIterator(columns, 
tableRoot,
+        columnMapping, metadataQuerier);
+    switch (tableQueryOrdering) {
+      case DEVICE:
+        return new DeviceOrderedTsBlockReader(deviceTaskIterator, 
metadataQuerier, chunkLoader);
+      case TIME:
+      default:
+        throw new UnsupportedOrderingException(tableQueryOrdering.toString());
+    }
   }
 
   @Override
@@ -218,16 +239,36 @@ public class TsFileExecutor implements QueryExecutor {
     return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, 
readersOfSelectedSeries);
   }
 
-  private class ColumnMapping {
+  public class ColumnMapping {
     /**
      * The same column may occur multiple times in a query, but we surely do 
not want to read it redundantly.
      * This mapping is used to put data of the same series into multiple 
columns.
      */
-    private Map<String, List<Integer>> columnPosMapping = new HashMap<>();
-    private Map<String, Boolean> isId = new HashMap<>();
+    private Map<String, List<Integer>> columnPosMap = new HashMap<>();
+    private Map<String, Boolean> isIdMap = new HashMap<>();
 
-    private void add(String columnName, int i, TableSchema schema) throws 
NoMeasurementException {
-      schema.getColumnSchemas()
+    public void add(String columnName, int i, TableSchema schema) throws 
NoColumnException {
+      final int columnIndex = schema.findColumnIndex(columnName);
+      if (columnIndex < 0) {
+        throw new NoColumnException(columnName);
+      }
+
+      final ColumnType columnType = schema.getColumnTypes().get(columnIndex);
+      columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
+      isIdMap.put(columnName, columnType.equals(ColumnType.ID));
     }
+
+    public List<Integer> getColumnPos(String columnName) {
+      return columnPosMap.getOrDefault(columnName, Collections.emptyList());
+    }
+
+    public boolean isId(String columnName) {
+      return isIdMap.getOrDefault(columnName, false);
+    }
+  }
+
+  public enum TableQueryOrdering {
+    TIME,
+    DEVICE
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
similarity index 54%
copy from 
tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
copy to 
tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
index f709bd08..46e9d24c 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/QueryExecutor.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
@@ -17,20 +17,24 @@
  * under the License.
  */
 
-package org.apache.tsfile.read.query.executor;
+package org.apache.tsfile.read.query.executor.task;
 
 import java.util.List;
-import org.apache.tsfile.read.expression.ExpressionTree;
-import org.apache.tsfile.read.expression.QueryExpression;
-import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
 
-import java.io.IOException;
-import org.apache.tsfile.read.reader.RecordReader;
+public class DeviceQueryTask {
+  private IDeviceID deviceID;
+  private List<String> columnNames;
+  private ColumnMapping columnMapping;
+  private MetadataIndexNode indexRoot;
 
-public interface QueryExecutor {
-
-  QueryDataSet execute(QueryExpression queryExpression) throws IOException;
-
-  RecordReader query(String tableName, List<String> columns, ExpressionTree 
timeFilter,
-      ExpressionTree idFilter, ExpressionTree measurementFilter);
+  public DeviceQueryTask(IDeviceID deviceID, List<String> columnNames, 
ColumnMapping columnMapping,
+      MetadataIndexNode indexRoot) {
+    this.deviceID = deviceID;
+    this.columnNames = columnNames;
+    this.columnMapping = columnMapping;
+    this.indexRoot = indexRoot;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
new file mode 100644
index 00000000..cdcb78d0
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
@@ -0,0 +1,33 @@
+package org.apache.tsfile.read.query.executor.task;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
+import org.apache.tsfile.utils.Pair;
+
+public class DeviceTaskIterator implements Iterator<DeviceQueryTask> {
+  private List<String> columnNames;
+  private ColumnMapping columnMapping;
+  private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
+
+  public DeviceTaskIterator(List<String> columnNames, MetadataIndexNode 
indexRoot,
+      ColumnMapping columnMapping, IMetadataQuerier metadataQuerier) {
+    this.columnNames = columnNames;
+    this.columnMapping = columnMapping;
+    this.deviceMetaIterator = metadataQuerier.deviceIterator(indexRoot);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return deviceMetaIterator.hasNext();
+  }
+
+  @Override
+  public DeviceQueryTask next() {
+    final Pair<IDeviceID, MetadataIndexNode> next = deviceMetaIterator.next();
+    return new DeviceQueryTask(next.left, columnNames, columnMapping, 
next.right);
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
new file mode 100644
index 00000000..a86a628b
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
@@ -0,0 +1,55 @@
+package org.apache.tsfile.read.reader.block;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.controller.IChunkLoader;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
+import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+
+public class DeviceOrderedTsBlockReader implements TsBlockReader {
+
+  private final DeviceTaskIterator taskIterator;
+  private final IMetadataQuerier metadataQuerier;
+  private final IChunkLoader chunkLoader;
+  private SingleDeviceTsBlockReader currentReader;
+
+  public DeviceOrderedTsBlockReader(DeviceTaskIterator taskIterator,
+      IMetadataQuerier metadataQuerier,
+      IChunkLoader chunkLoader) {
+    this.taskIterator = taskIterator;
+    this.metadataQuerier = metadataQuerier;
+    this.chunkLoader = chunkLoader;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (currentReader != null && currentReader.hasNext()) {
+      return true;
+    }
+    while (taskIterator.hasNext()) {
+      final DeviceQueryTask nextTask = taskIterator.next();
+      currentReader = new SingleDeviceTsBlockReader(nextTask, metadataQuerier, 
chunkLoader);
+      if (currentReader.hasNext()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public TsBlock next() throws IOException {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return currentReader.next();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (currentReader != null) {
+      currentReader.close();
+    }
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
new file mode 100644
index 00000000..59daad83
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
@@ -0,0 +1,36 @@
+package org.apache.tsfile.read.reader.block;
+
+import java.io.IOException;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.controller.IChunkLoader;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
+
+public class SingleDeviceTsBlockReader implements TsBlockReader {
+
+  private DeviceQueryTask task;
+  private IMetadataQuerier metadataQuerier;
+  private IChunkLoader chunkLoader;
+
+  public SingleDeviceTsBlockReader(DeviceQueryTask task, IMetadataQuerier 
metadataQuerier,
+      IChunkLoader chunkLoader) {
+    this.task = task;
+    this.metadataQuerier = metadataQuerier;
+    this.chunkLoader = chunkLoader;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public TsBlock next() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // nothing to be done
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/RecordReader.java 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/TsBlockReader.java
similarity index 88%
rename from tsfile/src/main/java/org/apache/tsfile/read/reader/RecordReader.java
rename to 
tsfile/src/main/java/org/apache/tsfile/read/reader/block/TsBlockReader.java
index 782c9809..f542ac7d 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/reader/RecordReader.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/TsBlockReader.java
@@ -17,16 +17,16 @@
  * under the License.
  */
 
-package org.apache.tsfile.read.reader;
+package org.apache.tsfile.read.reader.block;
 
 import java.io.IOException;
 import org.apache.tsfile.read.common.block.TsBlock;
 
-public interface RecordReader extends AutoCloseable {
+public interface TsBlockReader extends AutoCloseable {
   boolean hasNext();
   TsBlock next() throws IOException;
 
-  class EmptyRecordReader implements RecordReader{
+  class EmptyTsBlockReader implements TsBlockReader {
 
     @Override
     public boolean hasNext() {

Reply via email to