This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch perfect_tableagg
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 622e3a5c16ba61e4fc62a55159fc8b5e6ad8eb58
Author: Beyyes <[email protected]>
AuthorDate: Fri Dec 20 00:29:33 2024 +0800

    perfect TableAggregationTableScanOperator
---
 .../TableAggregationTableScanOperator.java         | 159 ++++++-------
 .../plan/planner/TableOperatorGenerator.java       | 251 ++++++++++-----------
 2 files changed, 192 insertions(+), 218 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e963963bf7d..8ef9ddbbf04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -24,7 +24,7 @@ import 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow;
@@ -49,7 +49,6 @@ import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.BinaryColumn;
-import org.apache.tsfile.read.common.block.column.LongColumn;
 import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
@@ -68,120 +67,97 @@ import java.util.stream.Collectors;
 import static java.lang.String.format;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
 import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 
-public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregationScanOperator {
+public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperator {
 
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class);
 
-  private final List<TableAggregator> tableAggregators;
+  private boolean finished = false;
+  private TsBlock inputTsBlock;
 
+  private final List<TableAggregator> tableAggregators;
   private final List<ColumnSchema> groupingKeySchemas;
   private final int[] groupingKeyIndex;
 
-  public static final LongColumn TIME_COLUMN_TEMPLATE =
-      new LongColumn(1, Optional.empty(), new long[] {0});
-
-  private final List<ColumnSchema> columnSchemas;
-
-  private final int[] columnsIndexArray;
-
   private final List<DeviceEntry> deviceEntries;
-
   private final int deviceCount;
-
-  private final Ordering scanOrder;
-  private final SeriesScanOptions seriesScanOptions;
-
+  private int currentDeviceIndex;
   private final List<String> measurementColumnNames;
   private final Set<String> allSensors;
-
   private final List<IMeasurementSchema> measurementSchemas;
-
   private final List<TSDataType> measurementColumnTSDataTypes;
+  private final int measurementCount;
 
-  // TODO calc maxTsBlockLineNum using date_bin
-  private final int maxTsBlockLineNum;
+  private final List<ColumnSchema> aggColumnSchemas;
+  private final int[] aggColumnsIndexArray;
 
-  // for different aggregations aiming to same column, use this variable to 
point to same column
-  private final List<Integer> aggArguments;
+  private final SeriesScanOptions seriesScanOptions;
+  private final boolean ascending;
+  private final Ordering scanOrder;
+  // Some special data types(like BLOB) cannot use statistics
+  protected final boolean canUseStatistics;
+  private final long cachedRawDataSize;
 
-  private QueryDataSource queryDataSource;
+  // stores all inputChannels of tableAggregators,
+  // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels 
should be [0, 1, 0]
+  private final List<Integer> aggregatorInputChannels;
 
-  private int currentDeviceIndex;
+  private QueryDataSource queryDataSource;
 
-  ITableTimeRangeIterator timeIterator;
+  private final ITableTimeRangeIterator timeIterator;
 
   private boolean allAggregatorsHasFinalResult = false;
 
   public TableAggregationTableScanOperator(
       PlanNodeId sourceId,
       OperatorContext context,
-      List<ColumnSchema> columnSchemas,
-      int[] columnsIndexArray,
+      List<ColumnSchema> aggColumnSchemas,
+      int[] aggColumnsIndexArray,
       List<DeviceEntry> deviceEntries,
-      Ordering scanOrder,
       SeriesScanOptions seriesScanOptions,
       List<String> measurementColumnNames,
       Set<String> allSensors,
       List<IMeasurementSchema> measurementSchemas,
-      int maxTsBlockLineNum,
-      int measurementCount,
       List<TableAggregator> tableAggregators,
       List<ColumnSchema> groupingKeySchemas,
       int[] groupingKeyIndex,
       ITableTimeRangeIterator tableTimeRangeIterator,
       boolean ascending,
-      long maxReturnSize,
       boolean canUseStatistics,
-      List<Integer> aggArguments) {
-
-    super(
-        sourceId,
-        context,
-        null,
-        measurementCount,
-        null,
-        null,
-        ascending,
-        false,
-        null,
-        maxReturnSize,
-        (1L + measurementCount) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
-        canUseStatistics);
+      List<Integer> aggregatorInputChannels) {
 
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.canUseStatistics = canUseStatistics;
     this.tableAggregators = tableAggregators;
     this.groupingKeySchemas = groupingKeySchemas;
     this.groupingKeyIndex = groupingKeyIndex;
-
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.columnSchemas = columnSchemas;
-    this.columnsIndexArray = columnsIndexArray;
+    this.aggColumnSchemas = aggColumnSchemas;
+    this.aggColumnsIndexArray = aggColumnsIndexArray;
     this.deviceEntries = deviceEntries;
     this.deviceCount = deviceEntries.size();
     this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
-    this.scanOrder = scanOrder;
+    this.ascending = ascending;
+    this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
     this.seriesScanOptions = seriesScanOptions;
     this.measurementColumnNames = measurementColumnNames;
+    this.measurementCount = measurementColumnNames.size();
+    this.cachedRawDataSize =
+        (1L + this.measurementCount)
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
     this.allSensors = allSensors;
     this.measurementSchemas = measurementSchemas;
     this.measurementColumnTSDataTypes =
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
     this.currentDeviceIndex = 0;
     this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
-    this.aggArguments = aggArguments;
+    this.aggregatorInputChannels = aggregatorInputChannels;
     this.timeIterator = tableTimeRangeIterator;
-    if (tableTimeRangeIterator.getType()
-        == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
-      curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
-    }
-
-    this.maxReturnSize = maxReturnSize;
-    this.maxTsBlockLineNum = maxTsBlockLineNum;
 
     constructAlignedSeriesScanUtil();
   }
@@ -192,9 +168,23 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       finished = !hasNextWithTimer();
     }
     return finished;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return cachedRawDataSize + maxReturnSize;
+  }
 
-    //    return (retainedTsBlock == null)
-    //        && (currentDeviceIndex >= deviceCount || 
seriesScanOptions.limitConsumedUp());
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
+        ? cachedRawDataSize
+        : 0;
   }
 
   @Override
@@ -231,7 +221,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       // return true if current time window is calc finished
       if (calculateAggregationResultForCurrentTimeRange()) {
         timeIterator.resetCurTimeRange();
-        // curTimeRange = null;
       }
     }
 
@@ -266,7 +255,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     return resultTsBlock;
   }
 
-  private void constructAlignedSeriesScanUtil() {
+  protected void constructAlignedSeriesScanUtil() {
     DeviceEntry deviceEntry;
 
     if (this.deviceEntries.isEmpty() || 
this.deviceEntries.get(this.currentDeviceIndex) == null) {
@@ -290,7 +279,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   }
 
   /** Return true if we have the result of this timeRange. */
-  @Override
   protected boolean calculateAggregationResultForCurrentTimeRange() {
     try {
       if (calcFromCachedData()) {
@@ -353,14 +341,12 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     }
   }
 
-  @Override
   protected void updateResultTsBlock() {
     appendAggregationResult(resultTsBlockBuilder, tableAggregators);
     // after appendAggregationResult invoked, aggregators must be cleared
     resetTableAggregators();
   }
 
-  @Override
   protected boolean calcFromCachedData() {
     return calcUsingRawData(inputTsBlock);
   }
@@ -420,13 +406,13 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     }
 
     TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
-    Column[] valueColumns = new Column[aggArguments.size()];
-    for (int idx : aggArguments) {
+    Column[] valueColumns = new Column[aggregatorInputChannels.size()];
+    for (int idx : aggregatorInputChannels) {
       if (valueColumns[idx] != null) {
         continue;
       }
       valueColumns[idx] =
-          buildValueColumn(columnSchemas.get(idx).getColumnCategory(), 
inputRegion, idx);
+          buildValueColumn(aggColumnSchemas.get(idx).getColumnCategory(), 
inputRegion, idx);
     }
 
     TsBlock tsBlock =
@@ -463,7 +449,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
             (String)
                 deviceEntries
                     .get(currentDeviceIndex)
-                    .getNthSegment(columnsIndexArray[columnIdx] + 1);
+                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
         return getIdOrAttrColumn(
             inputRegion.getTimeColumn().getPositionCount(),
             id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
@@ -472,10 +458,10 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
             deviceEntries
                 .get(currentDeviceIndex)
                 .getAttributeColumnValues()
-                .get(columnsIndexArray[columnIdx]);
+                .get(aggColumnsIndexArray[columnIdx]);
         return 
getIdOrAttrColumn(inputRegion.getTimeColumn().getPositionCount(), attr);
       case MEASUREMENT:
-        return inputRegion.getColumn(columnsIndexArray[columnIdx]);
+        return inputRegion.getColumn(aggColumnsIndexArray[columnIdx]);
       default:
         throw new IllegalStateException("Unsupported column type: " + 
columnSchemaCategory);
     }
@@ -507,10 +493,13 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         idx++;
 
         TsTableColumnCategory columnSchemaCategory =
-            columnSchemas.get(aggArguments.get(idx)).getColumnCategory();
+            
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
         statisticsArray[i] =
             buildStatistics(
-                columnSchemaCategory, timeStatistics, valueStatistics, 
aggArguments.get(idx));
+                columnSchemaCategory,
+                timeStatistics,
+                valueStatistics,
+                aggregatorInputChannels.get(idx));
       }
 
       aggregator.processStatistics(statisticsArray);
@@ -531,7 +520,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
             (String)
                 deviceEntries
                     .get(currentDeviceIndex)
-                    .getNthSegment(columnsIndexArray[columnIdx] + 1);
+                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
         return getStatistics(
             timeStatistics, id == null ? null : new Binary(id, 
TSFileConfig.STRING_CHARSET));
       case ATTRIBUTE:
@@ -539,10 +528,10 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
             deviceEntries
                 .get(currentDeviceIndex)
                 .getAttributeColumnValues()
-                .get(columnsIndexArray[columnIdx]);
+                .get(aggColumnsIndexArray[columnIdx]);
         return getStatistics(timeStatistics, attr);
       case MEASUREMENT:
-        return valueStatistics[columnsIndexArray[columnIdx]];
+        return valueStatistics[aggColumnsIndexArray[columnIdx]];
       default:
         throw new IllegalStateException("Unsupported column type: " + 
columnSchemaCategory);
     }
@@ -562,7 +551,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   }
 
   @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
-  @Override
   public boolean readAndCalcFromFile() throws IOException {
     // start stopwatch
     long start = System.nanoTime();
@@ -585,8 +573,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         if (timeIterator
             .getCurTimeRange()
             .contains(fileTimeStatistics.getStartTime(), 
fileTimeStatistics.getEndTime())) {
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
+          Statistics[] statisticsList = new Statistics[measurementCount];
+          for (int i = 0; i < measurementCount; i++) {
             statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
           }
           calcFromStatistics(fileTimeStatistics, statisticsList);
@@ -632,8 +620,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
             .getCurTimeRange()
             .contains(chunkTimeStatistics.getStartTime(), 
chunkTimeStatistics.getEndTime())) {
           // calc from chunkMetaData
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
+          Statistics[] statisticsList = new Statistics[measurementCount];
+          for (int i = 0; i < measurementCount; i++) {
             statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
           }
           calcFromStatistics(chunkTimeStatistics, statisticsList);
@@ -681,8 +669,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
           if (timeIterator
               .getCurTimeRange()
               .contains(pageTimeStatistics.getStartTime(), 
pageTimeStatistics.getEndTime())) {
-            Statistics[] statisticsList = new Statistics[subSensorSize];
-            for (int i = 0; i < subSensorSize; i++) {
+            Statistics[] statisticsList = new Statistics[measurementCount];
+            for (int i = 0; i < measurementCount; i++) {
               statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
             }
             calcFromStatistics(pageTimeStatistics, statisticsList);
@@ -866,7 +854,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     this.queryDataSource = (QueryDataSource) dataSource;
     this.seriesScanUtil.initQueryDataSource(queryDataSource);
     this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
-    this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7dcdbe04c95..3873973ca85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -378,21 +378,18 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       }
     }
 
-    SeriesScanOptions.Builder scanOptionsBuilder =
-        node.getTimePredicate().isPresent()
-            ? getSeriesScanOptionsBuilder(context, 
node.getTimePredicate().get())
-            : new SeriesScanOptions.Builder();
-    scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
-    scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-    
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
-    scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
-
-    Expression pushDownPredicate = node.getPushDownPredicate();
-    if (pushDownPredicate != null) {
-      scanOptionsBuilder.withPushDownFilter(
-          convertPredicateToFilter(
-              pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
-    }
+    SeriesScanOptions seriesScanOptions =
+        buildSeriesScanOptions(
+            context,
+            columnSchemaMap,
+            measurementColumnNames,
+            measurementColumnsIndexMap,
+            timeColumnName,
+            node.getTimePredicate(),
+            node.getPushDownLimit(),
+            node.getPushDownOffset(),
+            node.isPushLimitToEachDevice(),
+            node.getPushDownPredicate());
 
     OperatorContext operatorContext =
         context
@@ -421,7 +418,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             columnsIndexArray,
             node.getDeviceEntries(),
             node.getScanOrder(),
-            scanOptionsBuilder.build(),
+            seriesScanOptions,
             measurementColumnNames,
             allSensors,
             measurementSchemas,
@@ -1748,74 +1745,71 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
   public Operator visitAggregationTableScan(
       AggregationTableScanNode node, LocalExecutionPlanContext context) {
 
+    List<String> measurementColumnNames = new ArrayList<>();
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    Map<String, Integer> measurementColumnsIndexMap = new HashMap<>();
+
     List<TableAggregator> aggregators = new 
ArrayList<>(node.getAggregations().size());
-    Map<Symbol, Integer> columnLayout = new 
HashMap<>(node.getAggregations().size());
+    List<Integer> aggregatorInputChannels =
+        new ArrayList<>(
+            (int)
+                node.getAggregations().values().stream()
+                    .mapToLong(aggregation -> 
aggregation.getArguments().size())
+                    .sum());
+    int aggDistinctArgumentCount =
+        (int)
+            node.getAggregations().values().stream()
+                .flatMap(aggregation -> aggregation.getArguments().stream())
+                .map(Symbol::from)
+                .distinct()
+                .count();
+    List<ColumnSchema> aggColumnSchemas = new 
ArrayList<>(aggDistinctArgumentCount);
+    Map<Symbol, Integer> aggColumnLayout = new 
HashMap<>(aggDistinctArgumentCount);
+    int[] aggColumnsIndexArray = new int[aggDistinctArgumentCount];
 
-    int distinctArgumentCount = node.getAssignments().size();
-    int aggregationsCount = node.getAggregations().size();
-    List<Integer> aggColumnIndexes = new ArrayList<>();
+    String timeColumnName = null;
     int channel = 0;
-    int idx = -1;
     int measurementColumnCount = 0;
-    Map<Symbol, Integer> idAndAttributeColumnsIndexMap = 
node.getIdAndAttributeIndexMap();
-    Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments();
-    List<ColumnSchema> columnSchemas = new ArrayList<>(aggregationsCount);
-    int[] columnsIndexArray = new int[distinctArgumentCount];
-    List<String> measurementColumnNames = new ArrayList<>();
-    Map<String, Integer> measurementColumnsIndexMap = new HashMap<>();
-    String timeColumnName = null;
-    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-
     for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : 
node.getAggregations().entrySet()) {
-      AggregationNode.Aggregation aggregation = entry.getValue();
-
-      for (Expression argument : aggregation.getArguments()) {
-        idx++;
+      for (Expression argument : entry.getValue().getArguments()) {
         Symbol symbol = Symbol.from(argument);
-        ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol), 
symbol + " is null");
-        switch (schema.getColumnCategory()) {
-          case ID:
-          case ATTRIBUTE:
-            if (!columnLayout.containsKey(symbol)) {
-              columnsIndexArray[channel] =
-                  requireNonNull(idAndAttributeColumnsIndexMap.get(symbol), 
symbol + " is null");
-              columnSchemas.add(schema);
-            }
-            break;
-          case MEASUREMENT:
-            if (!columnLayout.containsKey(symbol)) {
-              columnsIndexArray[channel] = measurementColumnCount;
+        ColumnSchema schema =
+            requireNonNull(node.getAssignments().get(symbol), symbol + " is 
null");
+        if (!aggColumnLayout.containsKey(symbol)) {
+          switch (schema.getColumnCategory()) {
+            case ID:
+            case ATTRIBUTE:
+              aggColumnsIndexArray[channel] =
+                  requireNonNull(node.getIdAndAttributeIndexMap().get(symbol), 
symbol + " is null");
+              break;
+            case MEASUREMENT:
+              aggColumnsIndexArray[channel] = measurementColumnCount;
               measurementColumnCount++;
               measurementColumnNames.add(schema.getName());
               measurementSchemas.add(
                   new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
-              columnSchemas.add(schema);
               measurementColumnsIndexMap.put(symbol.getName(), 
measurementColumnCount - 1);
-            }
-            break;
-          case TIME:
-            if (!columnLayout.containsKey(symbol)) {
-              columnsIndexArray[channel] = -1;
-              columnSchemas.add(schema);
+              break;
+            case TIME:
+              aggColumnsIndexArray[channel] = -1;
               timeColumnName = symbol.getName();
-            }
-            break;
-          default:
-            throw new IllegalArgumentException(
-                "Unexpected column category: " + schema.getColumnCategory());
-        }
+              break;
+            default:
+              throw new IllegalArgumentException(
+                  "Unexpected column category: " + schema.getColumnCategory());
+          }
 
-        if (!columnLayout.containsKey(symbol)) {
-          aggColumnIndexes.add(channel);
-          columnLayout.put(symbol, channel++);
+          aggColumnSchemas.add(schema);
+          aggregatorInputChannels.add(channel);
+          aggColumnLayout.put(symbol, channel++);
         } else {
-          aggColumnIndexes.add(columnLayout.get(symbol));
+          aggregatorInputChannels.add(aggColumnLayout.get(symbol));
         }
       }
     }
 
     for (Map.Entry<Symbol, ColumnSchema> entry : 
node.getAssignments().entrySet()) {
-      if (!columnLayout.containsKey(entry.getKey())
+      if (!aggColumnLayout.containsKey(entry.getKey())
           && entry.getValue().getColumnCategory() == MEASUREMENT) {
         measurementColumnCount++;
         measurementColumnNames.add(entry.getValue().getName());
@@ -1835,7 +1829,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : 
node.getAggregations().entrySet()) {
       aggregators.add(
           buildAggregator(
-              columnLayout,
+              aggColumnLayout,
               entry.getKey(),
               entry.getValue(),
               node.getStep(),
@@ -1853,9 +1847,9 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       for (int i = 0; i < node.getGroupingKeys().size(); i++) {
         Symbol groupingKey = node.getGroupingKeys().get(i);
 
-        if (idAndAttributeColumnsIndexMap.containsKey(groupingKey)) {
-          groupingKeySchemas.add(columnSchemaMap.get(groupingKey));
-          groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey);
+        if (node.getIdAndAttributeIndexMap().containsKey(groupingKey)) {
+          groupingKeySchemas.add(node.getAssignments().get(groupingKey));
+          groupingKeyIndex[i] = 
node.getIdAndAttributeIndexMap().get(groupingKey);
         } else {
           if (node.getProjection() != null
               && !node.getProjection().getMap().isEmpty()
@@ -1896,62 +1890,81 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 TableAggregationTableScanOperator.class.getSimpleName());
-    SeriesScanOptions.Builder scanOptionsBuilder =
-        node.getTimePredicate().isPresent()
-            ? getSeriesScanOptionsBuilder(context, 
node.getTimePredicate().get())
-            : new SeriesScanOptions.Builder();
-    scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
-    scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-    
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
-    scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
-    Expression pushDownPredicate = node.getPushDownPredicate();
-    if (pushDownPredicate != null) {
-      scanOptionsBuilder.withPushDownFilter(
-          convertPredicateToFilter(
-              pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
-    }
+    SeriesScanOptions seriesScanOptions =
+        buildSeriesScanOptions(
+            context,
+            node.getAssignments(),
+            measurementColumnNames,
+            measurementColumnsIndexMap,
+            timeColumnName,
+            node.getTimePredicate(),
+            node.getPushDownLimit(),
+            node.getPushDownOffset(),
+            node.isPushLimitToEachDevice(),
+            node.getPushDownPredicate());
 
     Set<String> allSensors = new HashSet<>(measurementColumnNames);
-    // for time column
-    allSensors.add("");
+    allSensors.add(""); // for time column
+
+    for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+      AlignedFullPath alignedPath =
+          constructAlignedPath(
+              node.getDeviceEntries().get(i),
+              measurementColumnNames,
+              measurementSchemas,
+              allSensors);
+      ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
+    }
+
+    context.getDriverContext().setInputDriver(true);
+
     TableAggregationTableScanOperator aggTableScanOperator =
         new TableAggregationTableScanOperator(
             node.getPlanNodeId(),
             operatorContext,
-            columnSchemas,
-            columnsIndexArray,
+            aggColumnSchemas,
+            aggColumnsIndexArray,
             node.getDeviceEntries(),
-            scanAscending ? Ordering.ASC : Ordering.DESC,
-            scanOptionsBuilder.build(),
+            seriesScanOptions,
             measurementColumnNames,
             allSensors,
             measurementSchemas,
-            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
-            measurementColumnCount,
             aggregators,
             groupingKeySchemas,
             groupingKeyIndex,
             timeRangeIterator,
             scanAscending,
-            calculateMaxAggregationResultSize(),
             canUseStatistic,
-            aggColumnIndexes);
-
+            aggregatorInputChannels);
     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
+    return aggTableScanOperator;
+  }
 
-    for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
-      AlignedFullPath alignedPath =
-          constructAlignedPath(
-              node.getDeviceEntries().get(i),
-              measurementColumnNames,
-              measurementSchemas,
-              allSensors);
-      ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
+  private SeriesScanOptions buildSeriesScanOptions(
+      LocalExecutionPlanContext context,
+      Map<Symbol, ColumnSchema> columnSchemaMap,
+      List<String> measurementColumnNames,
+      Map<String, Integer> measurementColumnsIndexMap,
+      String timeColumnName,
+      Optional<Expression> timePredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice,
+      Expression pushDownPredicate) {
+    SeriesScanOptions.Builder scanOptionsBuilder =
+        timePredicate
+            .map(expression -> getSeriesScanOptionsBuilder(context, 
expression))
+            .orElseGet(SeriesScanOptions.Builder::new);
+    scanOptionsBuilder.withPushDownLimit(pushDownLimit);
+    scanOptionsBuilder.withPushDownOffset(pushDownOffset);
+    scanOptionsBuilder.withPushLimitToEachDevice(pushLimitToEachDevice);
+    scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
+    if (pushDownPredicate != null) {
+      scanOptionsBuilder.withPushDownFilter(
+          convertPredicateToFilter(
+              pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
     }
-
-    context.getDriverContext().setInputDriver(true);
-
-    return aggTableScanOperator;
+    return scanOptionsBuilder.build();
   }
 
   @Override
@@ -2035,30 +2048,4 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     }
     return new boolean[] {canUseStatistic, isAscending};
   }
-
-  public static long calculateMaxAggregationResultSize(
-      // List<? extends AggregationDescriptor> aggregationDescriptors,
-      // ITimeRangeIterator timeRangeIterator
-      ) {
-    // TODO perfect max aggregation result size logic
-    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
-    //    long timeValueColumnsSizePerLine = 
TimeColumn.SIZE_IN_BYTES_PER_POSITION;
-    //    for (AggregationDescriptor descriptor : aggregationDescriptors) {
-    //      List<TSDataType> outPutDataTypes =
-    //              descriptor.getOutputColumnNames().stream()
-    //                      .map(typeProvider::getTableModelType)
-    //                      .collect(Collectors.toList());
-    //      for (TSDataType tsDataType : outPutDataTypes) {
-    //        timeValueColumnsSizePerLine += 
getOutputColumnSizePerLine(tsDataType);
-    //      }
-    //    }
-    //
-    //    return Math.min(
-    //            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
-    //            Math.min(
-    //                    
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
-    //                    timeRangeIterator.getTotalIntervalNum())
-    //                    * timeValueColumnsSizePerLine);
-  }
 }

Reply via email to