This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch agg_table_scan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d4defa7e0a8038b77187c1f0639c0c6275b2a2d1
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 10 12:29:53 2024 +0800

    add basic agg table scan impl
---
 .../TableAggregationTableScanOperator.java         | 33 +++++++++--
 .../plan/planner/TableOperatorGenerator.java       | 67 +++++++++++++---------
 .../TableModelTypeProviderExtractor.java           | 16 +++++-
 3 files changed, 82 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index 2793a81325c..6829c9c0fa6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -44,6 +44,7 @@ import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
@@ -52,6 +53,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static java.lang.String.format;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.appendAggregationResult;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.calculateAggregationFromRawData;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
@@ -161,13 +163,14 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   @Override
   public TsBlock next() throws Exception {
     // start stopwatch, reset leftRuntimeOfOneNextCall
+
+    // TODO add maxRunTime optimization
     long start = System.nanoTime();
     // leftRuntimeOfOneNextCall = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     // long maxRuntime = leftRuntimeOfOneNextCall;
-
-    while (
     // System.nanoTime() - start < maxRuntime&&
-    (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
+
+    while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange())
         && !resultTsBlockBuilder.isFull()) {
       if (curTimeRange == null) {
         // move to the next time window
@@ -181,14 +184,33 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       // calculate aggregation result on current time window
       // Keep curTimeRange if the calculation of this timeRange is not done
       if (calculateAggregationResultForCurrentTimeRange()) {
+        updateResultTsBlock();
         curTimeRange = null;
       }
     }
 
     if (resultTsBlockBuilder.getPositionCount() > 0) {
-      TsBlock resultTsBlock = resultTsBlockBuilder.build();
+      int declaredPositions = resultTsBlockBuilder.getPositionCount();
+      ColumnBuilder[] valueColumnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
+      Column[] valueColumns = new Column[valueColumnBuilders.length];
+      for (int i = 0; i < valueColumns.length; i++) {
+        valueColumns[i] = valueColumnBuilders[i].build();
+        if (valueColumns[i].getPositionCount() != declaredPositions) {
+          throw new IllegalStateException(
+              format(
+                  "Declared positions (%s) does not match column %s's number 
of entries (%s)",
+                  declaredPositions, i, valueColumns[i].getPositionCount()));
+        }
+      }
+
+      this.resultTsBlock =
+          new TsBlock(
+              resultTsBlockBuilder.getPositionCount(),
+              new RunLengthEncodedColumn(
+                  TIME_COLUMN_TEMPLATE, 
resultTsBlockBuilder.getPositionCount()),
+              valueColumns);
       resultTsBlockBuilder.reset();
-      return resultTsBlock;
+      return this.resultTsBlock;
     } else {
       return null;
     }
@@ -254,7 +276,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       } else {
         currentDeviceIndex++;
       }
-      updateResultTsBlock();
       if (currentDeviceIndex < deviceCount) {
         // construct AlignedSeriesScanUtil for next device
         this.seriesScanUtil = 
constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index c4e1bc9ee8a..43a4defd960 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -109,6 +109,7 @@ import javax.validation.constraints.NotNull;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -963,7 +964,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new AggregationOperator(context, child, aggregatorBuilder.build());
   }
 
-  private ImmutableMap<Symbol, Integer> 
makeLayoutFromOutputSymbols(List<Symbol> outputSymbols) {
+  private ImmutableMap<Symbol, Integer> makeLayoutFromOutputSymbols(
+      Collection<Symbol> outputSymbols) {
     ImmutableMap.Builder<Symbol, Integer> outputMappings = 
ImmutableMap.builder();
     int channel = 0;
     for (Symbol symbol : outputSymbols) {
@@ -1007,7 +1009,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new TableAggregator(
         accumulator,
         step,
-        getTSDataType(typeProvider.getTableModelType(aggregationSymbol)),
+        
getTSDataType(aggregation.getResolvedFunction().getSignature().getReturnType()),
+        // getTSDataType(typeProvider.getTableModelType(aggregationSymbol)),
         argumentChannels,
         OptionalInt.empty());
   }
@@ -1025,8 +1028,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     List<TableAggregator> aggregators = new ArrayList<>();
 
-    // TODO fix childLayout
-    Map<Symbol, Integer> childLayout = new HashMap<>();
+    // TODO how to use the output symbols of AggregationTableScan?
+    Map<Symbol, Integer> childLayout = 
makeLayoutFromOutputSymbols(node.getAssignments().keySet());
 
     for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : 
node.getAggregations().entrySet()) {
       TableAggregator aggregator =
@@ -1039,7 +1042,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       aggregators.add(aggregator);
     }
 
-    List<Symbol> outputColumnNames = node.getOutputSymbols();
+    Collection<Symbol> outputColumnNames = node.getAssignments().keySet();
     int outputColumnCount = outputColumnNames.size();
     List<ColumnSchema> columnSchemas = new ArrayList<>(outputColumnCount);
     int[] columnsIndexArray = new int[outputColumnCount];
@@ -1109,28 +1112,40 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     ITimeRangeIterator timeRangeIterator =
         new SingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE);
 
-    return new TableAggregationTableScanOperator(
-        node.getPlanNodeId(),
-        operatorContext,
-        columnSchemas,
-        columnsIndexArray,
-        measurementColumnCount,
-        node.getDeviceEntries(),
-        node.getScanOrder(),
-        scanOptionsBuilder.build(),
-        measurementColumnNames,
-        measurementSchemas,
-        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
-        // TODO if it equals subSensor variable
-        measurementColumnCount,
-        aggregators,
-        timeRangeIterator,
-        false,
-        null,
-        calculateMaxAggregationResultSize(),
-        true);
+    TableAggregationTableScanOperator aggTableScanOperator =
+        new TableAggregationTableScanOperator(
+            node.getPlanNodeId(),
+            operatorContext,
+            columnSchemas,
+            columnsIndexArray,
+            measurementColumnCount,
+            node.getDeviceEntries(),
+            node.getScanOrder(),
+            scanOptionsBuilder.build(),
+            measurementColumnNames,
+            measurementSchemas,
+            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
+            // TODO if it equals subSensor variable
+            measurementColumnCount,
+            aggregators,
+            timeRangeIterator,
+            false,
+            null,
+            calculateMaxAggregationResultSize(),
+            true);
+
+    ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
+
+    for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
+      AlignedFullPath alignedPath =
+          constructAlignedPath(
+              node.getDeviceEntries().get(i), measurementColumnNames, 
measurementSchemas);
+      ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
+    }
+
+    context.getDriverContext().setInputDriver(true);
 
-    // throw new UnsupportedOperationException("Agg-BE not supported");
+    return aggTableScanOperator;
   }
 
   public static long calculateMaxAggregationResultSize(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index a78f8695b07..cabcdffd869 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
@@ -73,6 +74,12 @@ public class TableModelTypeProviderExtractor {
 
     @Override
     public Void visitPlan(PlanNode node, Void context) {
+      addOutputSymbolsToTypeProvider(node);
+      node.getChildren().forEach(child -> child.accept(this, context));
+      return null;
+    }
+
+    private void addOutputSymbolsToTypeProvider(PlanNode node) {
       for (Symbol symbol : node.getOutputSymbols()) {
         if (!feTypeProvider.isSymbolExist(symbol)) {
           throw new IllegalStateException(
@@ -82,8 +89,6 @@ public class TableModelTypeProviderExtractor {
         }
         beTypeProvider.putTableModelType(symbol, 
feTypeProvider.getTableModelType(symbol));
       }
-      node.getChildren().forEach(child -> child.accept(this, context));
-      return null;
     }
 
     @Override
@@ -102,6 +107,13 @@ public class TableModelTypeProviderExtractor {
       return null;
     }
 
+    @Override
+    public Void visitAggregationTableScan(AggregationTableScanNode node, Void 
context) {
+      addOutputSymbolsToTypeProvider(node);
+      node.getAssignments().forEach((k, v) -> 
beTypeProvider.putTableModelType(k, v.getType()));
+      return null;
+    }
+
     @Override
     public Void visitTableScan(TableScanNode node, Void context) {
       node.getAssignments().forEach((k, v) -> 
beTypeProvider.putTableModelType(k, v.getType()));

Reply via email to