This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new 9cb0210675c Add TimeSlice Control for TableScanOperator
9cb0210675c is described below

commit 9cb0210675cc0dc0863b144fd0116511c31e9769
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 27 13:48:21 2024 +0800

    Add TimeSlice Control for TableScanOperator
---
 .../source/AbstractSeriesScanOperator.java         | 10 +--
 .../operator/source/AlignedSeriesScanOperator.java | 15 ++--
 .../source/relational/TableScanOperator.java       | 96 ++++------------------
 3 files changed, 31 insertions(+), 90 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
index d8c07d75caf..c03764bd61c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
@@ -87,7 +87,7 @@ public abstract class AbstractSeriesScanOperator extends 
AbstractDataSourceOpera
     }
   }
 
-  private boolean readFileData() throws IOException {
+  protected boolean readFileData() throws IOException {
     while (seriesScanUtil.hasNextFile()) {
       if (readChunkData()) {
         return true;
@@ -96,7 +96,7 @@ public abstract class AbstractSeriesScanOperator extends 
AbstractDataSourceOpera
     return false;
   }
 
-  private boolean readChunkData() throws IOException {
+  protected boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
         return true;
@@ -105,7 +105,7 @@ public abstract class AbstractSeriesScanOperator extends 
AbstractDataSourceOpera
     return false;
   }
 
-  private boolean readPageData() throws IOException {
+  protected boolean readPageData() throws IOException {
     if (seriesScanUtil.hasNextPage()) {
       TsBlock tsBlock = seriesScanUtil.nextPage();
       if (!isEmpty(tsBlock)) {
@@ -116,11 +116,11 @@ public abstract class AbstractSeriesScanOperator extends 
AbstractDataSourceOpera
     return false;
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
+  protected boolean isEmpty(TsBlock tsBlock) {
     return tsBlock == null || tsBlock.isEmpty();
   }
 
-  private void appendToBuilder(TsBlock tsBlock) {
+  protected void appendToBuilder(TsBlock tsBlock) {
     int size = tsBlock.getPositionCount();
     if (resultTsBlockBuilder.isEmpty() && size >= 
resultTsBlockBuilder.getMaxTsBlockLineNumber()) {
       retainedTsBlock = tsBlock;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index 63605457aec..5ab2a0f0055 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -86,22 +86,27 @@ public class AlignedSeriesScanOperator extends 
AbstractSeriesScanOperator {
 
   @Override
   protected void buildResult(TsBlock tsBlock) {
+    appendDataIntoBuilder(tsBlock, resultTsBlockBuilder);
+  }
+
+  public static void appendDataIntoBuilder(TsBlock tsBlock, TsBlockBuilder 
builder) {
     int size = tsBlock.getPositionCount();
-    TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
     TimeColumn timeColumn = tsBlock.getTimeColumn();
     for (int i = 0; i < size; i++) {
       timeColumnBuilder.writeLong(timeColumn.getLong(i));
-      resultTsBlockBuilder.declarePosition();
+      builder.declarePosition();
     }
     for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
         columnIndex < columnSize;
         columnIndex++) {
-      appendOneColumn(columnIndex, tsBlock, size);
+      appendOneColumn(columnIndex, tsBlock, size, builder);
     }
   }
 
-  private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) {
-    ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(columnIndex);
+  private static void appendOneColumn(
+      int columnIndex, TsBlock tsBlock, int size, TsBlockBuilder builder) {
+    ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
     Column column = tsBlock.getColumn(columnIndex);
     if (column.mayHaveNull()) {
       for (int i = 0; i < size; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index c998203d7b5..deded9e1bd6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 
 import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
@@ -42,8 +41,6 @@ import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -55,9 +52,10 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 
-public class TableScanOperator extends AbstractDataSourceOperator {
+public class TableScanOperator extends AbstractSeriesScanOperator {
 
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class);
@@ -140,6 +138,8 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
       long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
       long start = System.nanoTime();
 
+      boolean currentDeviceNoMoreData = false;
+
       // here use do-while to promise doing this at least once
       do {
         /*
@@ -148,6 +148,7 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
          * 3. consume next file finally
          */
         if (!readPageData() && !readChunkData() && !readFileData()) {
+          currentDeviceNoMoreData = true;
           break;
         }
 
@@ -156,7 +157,9 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
           && measurementDataBlock == null);
 
       // current device' data is consumed up
-      if (measurementDataBuilder.isEmpty() && measurementDataBlock == null) {
+      if (measurementDataBuilder.isEmpty()
+          && measurementDataBlock == null
+          && currentDeviceNoMoreData) {
         currentDeviceIndex++;
         prepareForNextDevice();
       }
@@ -181,75 +184,19 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
     return checkTsBlockSizeAndGetResult();
   }
 
-  private boolean readFileData() throws IOException {
-    while (seriesScanUtil.hasNextFile()) {
-      if (readChunkData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        appendToBuilder(tsBlock);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void appendToBuilder(TsBlock tsBlock) {
-    int size = tsBlock.getPositionCount();
+  @Override
+  protected void appendToBuilder(TsBlock tsBlock) {
     if (measurementDataBuilder.isEmpty()
         && tsBlock.getPositionCount() >= 
measurementDataBuilder.getMaxTsBlockLineNumber()) {
       measurementDataBlock = tsBlock;
       return;
     }
-    TimeColumnBuilder timeColumnBuilder = 
measurementDataBuilder.getTimeColumnBuilder();
-    TimeColumn timeColumn = tsBlock.getTimeColumn();
-    for (int i = 0; i < size; i++) {
-      timeColumnBuilder.writeLong(timeColumn.getLong(i));
-      measurementDataBuilder.declarePosition();
-    }
-    for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
-        columnIndex < columnSize;
-        columnIndex++) {
-      appendOneColumn(columnIndex, tsBlock, size);
-    }
-  }
-
-  private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) {
-    ColumnBuilder columnBuilder = 
measurementDataBuilder.getColumnBuilder(columnIndex);
-    Column column = tsBlock.getColumn(columnIndex);
-    if (column.mayHaveNull()) {
-      for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
-          columnBuilder.appendNull();
-        } else {
-          columnBuilder.write(column, i);
-        }
-      }
-    } else {
-      for (int i = 0; i < size; i++) {
-        columnBuilder.write(column, i);
-      }
-    }
+    appendDataIntoBuilder(tsBlock, measurementDataBuilder);
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
+  @Override
+  protected void buildResult(TsBlock tsBlock) {
+    throw new UnsupportedOperationException();
   }
 
   private void constructResultTsBlock() {
@@ -309,18 +256,7 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
   @Override
   public long calculateMaxPeekMemory() {
     return (1L + columnsIndexArray.length)
-        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
-        * 3L;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return calculateMaxPeekMemoryWithCounter();
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override

Reply via email to