This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04ccdace529f6606877f5f3b676045eafcb6a6b9
Author: Beyyes <[email protected]>
AuthorDate: Sat Dec 21 18:33:07 2024 +0800

    add impl
---
 .../source/relational/TableLastQueryOperator.java  | 396 +++++++++++++--------
 .../relational/aggregation/LastByAccumulator.java  |   4 +
 .../relational/aggregation/TableAggregator.java    |   4 +
 .../plan/planner/TableOperatorGenerator.java       |  14 +-
 4 files changed, 271 insertions(+), 147 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
index 1489812105d..30aa2933b0f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
@@ -24,6 +24,9 @@ import 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByAccumulator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -31,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -54,9 +58,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +70,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.Utils.serializeTimeValue;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 
 public class TableLastQueryOperator extends TableAggregationTableScanOperator {
@@ -75,40 +82,32 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
       TableDeviceSchemaCache.getInstance();
 
   private boolean finished = false;
-
-  private final QualifiedObjectName qualifiedObjectName;
-  private int lastQueryDeviceIndex;
-
-  private QueryDataSource queryDataSource;
-
-  private SeriesScanOptions initSeriesScanOptions;
+  private boolean fetchLastCacheForCurrentDevice;
+  private int outputDeviceIndex;
+  private DeviceEntry currentDeviceEntry;
+  private TableAggregationTableScanOperator aggTableScanOperator;
+  private Map<String, Integer> aggColumnLayout;
+  private int newChannelCnt = 0;
+  private List<Integer> newAggColumnsIndexArray;
+  private final List<Integer> initLastByInputChannels = Arrays.asList(0, -1, 
-1);
+  private final List<Integer> initLastInputChannels = Arrays.asList(0, -1);
+  // the ordinal of uncached columns in initTableAggregators
+  private List<Integer> unCachedMeasurementToAggregatorIndex = new 
ArrayList<>();
+  private final String dbName;
   // last_by(x,time) or last(time)
-  private final boolean[] isLastBy;
-  private boolean hashLastBy;
-  private boolean calcCacheForCurrentDevice;
+  private final boolean[] isLastByArray;
 
+  private QueryDataSource queryDataSource;
   private final List<DeviceEntry> initDeviceEntries;
   private final List<TableAggregator> initTableAggregators;
-  private final List<String> initMeasurementColumnNames;
-  private final Set<String> initAllSensors;
-  private final List<IMeasurementSchema> initMeasurementSchemas;
   private final List<ColumnSchema> initAggColumnSchemas;
   private final int[] initAggColumnsIndexArray;
-
-  // stores all inputChannels of tableAggregators,
-  // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels 
should be [0, 1, 0]
   private final List<Integer> initAggregatorInputChannels;
-
+  private final List<String> initMeasurementColumnNames;
+  private final List<IMeasurementSchema> initMeasurementSchemas;
+  private SeriesScanOptions initSeriesScanOptions;
   private final int groupKeySize;
 
-  DeviceEntry currentDeviceEntry;
-
-  List<Integer> cacheToAggregatorIndex = new ArrayList<>();
-
-  TableAggregationTableScanOperator aggTableScanOperator;
-
-  Map<String, Integer> aggColumnLayout;
-
   public TableLastQueryOperator(
       PlanNodeId sourceId,
       OperatorContext context,
@@ -149,22 +148,20 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
     this.initTableAggregators = tableAggregators;
     this.initAggregatorInputChannels = aggregatorInputChannels;
     this.initDeviceEntries = deviceEntries;
-
     this.initAggColumnSchemas = aggColumnSchemas;
     this.initAggColumnsIndexArray = aggColumnsIndexArray;
-
     this.initMeasurementColumnNames = measurementColumnNames;
     this.initMeasurementSchemas = measurementSchemas;
-
     this.initSeriesScanOptions = seriesScanOptions;
-    this.initAllSensors = allSensors;
-
-    this.isLastBy = new boolean[tableAggregators.size()];
-    // change it later
-    Arrays.fill(isLastBy, true);
-    this.qualifiedObjectName = qualifiedObjectName;
+    this.dbName = qualifiedObjectName.getDatabaseName();
+    this.groupKeySize = groupingKeySchemas == null ? 0 : 
groupingKeySchemas.size();
 
-    groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size();
+    this.isLastByArray = new boolean[tableAggregators.size()];
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      if (tableAggregators.get(i).getAccumulator() instanceof 
LastByAccumulator) {
+        isLastByArray[i] = true;
+      }
+    }
   }
 
   @Override
@@ -181,7 +178,7 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
       return true;
     }
 
-    return lastQueryDeviceIndex < initDeviceEntries.size();
+    return outputDeviceIndex < initDeviceEntries.size();
   }
 
   @Override
@@ -193,9 +190,9 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
       return getResultFromRetainedTsBlock();
     }
 
-    while (System.nanoTime() - start > maxRuntime
+    while (System.nanoTime() - start < maxRuntime
         && !resultTsBlockBuilder.isFull()
-        && lastQueryDeviceIndex < initDeviceEntries.size()) {
+        && outputDeviceIndex < initDeviceEntries.size()) {
       processCurrentDevice();
     }
 
@@ -209,79 +206,76 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
 
   /** Main process logic, calc the last aggregation results of current device. 
*/
   private void processCurrentDevice() throws Exception {
+    currentDeviceEntry = initDeviceEntries.get(outputDeviceIndex);
 
-    // TODO need lock?
-    currentDeviceEntry = initDeviceEntries.get(lastQueryDeviceIndex);
-
-    if (!calcCacheForCurrentDevice) {
-      resetArguments();
-
-      int channelCnt = 0;
+    if (!fetchLastCacheForCurrentDevice) {
+      resetAggArguments();
 
+      int initChannel = 0;
       for (int i = 0; i < initTableAggregators.size(); i++) {
         TableAggregator aggregator = initTableAggregators.get(i);
-        if (isLastBy[i]) {
-          processLastBy(i, channelCnt);
+        if (isLastByArray[i]) {
+          processLastBy(i, initChannel);
         } else {
-          processLast(channelCnt);
+          processLast(i, initChannel);
         }
-
-        channelCnt += aggregator.getChannelCount();
+        initChannel += aggregator.getChannelCount();
       }
 
-      calcCacheForCurrentDevice = true;
+      fetchLastCacheForCurrentDevice = true;
     }
 
-    if (aggColumnSchemas != null && !aggColumnSchemas.isEmpty()) {
-      //      if (aggTableScanOperator == null) {
-      //        // always add last(time) to tail, update last cache need the 
value of last(time)
-      //        tableAggregators.add(
-      //            new TableAggregator(
-      //                new LastDescAccumulator(TSDataType.TIMESTAMP),
-      //                tableAggregators.get(0).getStep(),
-      //                TSDataType.TIMESTAMP,
-      //                Arrays.asList(aggColumnLayout.get("time"), 
aggColumnLayout.get("time")),
-      //                OptionalInt.empty()));
-      //        aggTableScanOperator =
-      //            new TableAggregationTableScanOperator(
-      //                sourceId,
-      //                operatorContext,
-      //                aggColumnSchemas,
-      //                aggColumnsIndexArray,
-      //                Collections.singletonList(currentDeviceEntry),
-      //                seriesScanOptions,
-      //                measurementColumnNames,
-      //                allSensors,
-      //                measurementSchemas,
-      //                tableAggregators,
-      //                Collections.emptyList(),
-      //                null,
-      //                timeIterator,
-      //                false,
-      //                canUseStatistics,
-      //                aggregatorInputChannels);
-      //      }
-
-      //      if (calculateAggregationResultForCurrentTimeRange()) {
-      //
-      //      }
-
-      if (calculateAggregationResultForCurrentTimeRange()) {
-        //        TsBlock updateBlock = aggTableScanOperator.next();
-        //        if (updateBlock != null && !updateBlock.isEmpty()) {
-        String dbName = qualifiedObjectName.getDatabaseName();
+    if (hasUnCachedColumns()) {
+      if (aggTableScanOperator == null) {
+        buildAggTableScanArguments();
+      }
+
+      if (!aggTableScanOperator.hasNext()) {
+        for (int i = 0; i < tableAggregators.size() - 1; i++) {
+          resultTsBlockBuilder
+              .getValueColumnBuilders()[groupKeySize + 
unCachedMeasurementToAggregatorIndex.get(i)]
+              .appendNull();
+        }
+        outputDeviceIndex++;
+        fetchLastCacheForCurrentDevice = false;
+        appendToResultTsBlockBuilder();
+        return;
+      }
 
+      TsBlock updateBlock = aggTableScanOperator.next();
+      if (updateBlock != null && !updateBlock.isEmpty()) {
+        int channel = 0;
         List<String> updateMeasurementList = new ArrayList<>();
         List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
-        for (int i = 0; i < tableAggregators.size(); i++) {
-          //            if (!isLastBy[i] || (isLastBy[i] && 
!updateBlock.getColumn(i).isNull(0))) {
-          //              updateMeasurementList.add("");
-          //              TimeValuePair tv =
-          //                  new TimeValuePair(
-          //                      updateBlock.getColumn(0).getLong(0),
-          //                      
updateBlock.getColumn(i).getTsPrimitiveType(0));
-          //              updateTimeValuePairList.add(tv);
-          //            }
+        for (int i = 0; i < tableAggregators.size() - 1; i++) {
+          if (updateBlock.getColumn(i).isNull(0)) {
+            resultTsBlockBuilder
+                .getValueColumnBuilders()[
+                groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)]
+                .appendNull();
+          } else {
+            resultTsBlockBuilder
+                .getValueColumnBuilders()[
+                groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)]
+                .writeBinary(updateBlock.getColumn(i).getBinary(0));
+          }
+
+          boolean isLastBy =
+              tableAggregators.get(i).getAccumulator() instanceof 
LastByDescAccumulator;
+
+          if (!isLastBy || !updateBlock.getColumn(i).isNull(0)) {
+            ColumnSchema schema = 
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+            if (schema.getColumnCategory() == TsTableColumnCategory.TIME
+                || schema.getColumnCategory() == 
TsTableColumnCategory.MEASUREMENT) {
+              updateMeasurementList.add(
+                  schema.getColumnCategory() == TsTableColumnCategory.TIME ? 
"" : schema.getName());
+              TimeValuePair tv =
+                  new TimeValuePair(
+                      updateBlock.getColumn(tableAggregators.size() - 
1).getLong(0),
+                      updateBlock.getColumn(i).getTsPrimitiveType(0));
+              updateTimeValuePairList.add(tv);
+            }
+          }
         }
         String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
         TimeValuePair[] updateTimeValuePairArray =
@@ -295,28 +289,22 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
             updateMeasurementArray,
             updateTimeValuePairArray);
 
-        aggTableScanOperator.close();
-        aggTableScanOperator = null;
-        lastQueryDeviceIndex++;
+        outputDeviceIndex++;
+        fetchLastCacheForCurrentDevice = false;
         appendToResultTsBlockBuilder();
-        resetArguments();
       }
     }
-    //    }
   }
 
-  private void processLastBy(int aggregatorNum, int initChannel) {
+  private void processLastBy(int aggregatorNum, int channelNum) {
     TableAggregator aggregator = initTableAggregators.get(aggregatorNum);
-    ColumnSchema schema = initAggColumnSchemas.get(initChannel);
+    ColumnSchema schema = 
initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum));
     String columnName = schema.getName();
     if (schema.getColumnCategory() == TsTableColumnCategory.TIME
         || schema.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) {
       Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
           TABLE_DEVICE_SCHEMA_CACHE.getLastRow(
-              qualifiedObjectName.getDatabaseName(),
-              currentDeviceEntry.getDeviceID(),
-              "",
-              Collections.singletonList(columnName));
+              dbName, currentDeviceEntry.getDeviceID(), "", 
Collections.singletonList(columnName));
 
       if (lastByResult.isPresent() && lastByResult.get().getRight()[0] != 
null) {
         TsPrimitiveType timeValuePair = lastByResult.get().getRight()[0];
@@ -341,30 +329,38 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
         return;
       }
 
-      tableAggregators.add(aggregator);
-      cacheToAggregatorIndex.add(aggregatorNum);
+      unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
     }
 
+    TableAggregator newAggregator =
+        new TableAggregator(
+            aggregator.getAccumulator(),
+            aggregator.getStep(),
+            aggregator.getType(),
+            initLastByInputChannels,
+            OptionalInt.empty());
+    tableAggregators.add(newAggregator);
+
     // last_by always has three channels
     for (int i = 0; i < 3; i++) {
-      schema = initAggColumnSchemas.get(initChannel + i);
+      schema = 
initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum + i));
       columnName = schema.getName();
 
       if (!aggColumnLayout.containsKey(columnName)) {
         switch (schema.getColumnCategory()) {
           case ID:
           case ATTRIBUTE:
-            aggColumnsIndexArray[channel] = 
initAggColumnsIndexArray[initChannel];
+            newAggColumnsIndexArray.add(initAggColumnsIndexArray[channelNum]);
             break;
           case MEASUREMENT:
-            aggColumnsIndexArray[channel] = measurementCount;
+            newAggColumnsIndexArray.add(measurementCount);
             measurementCount++;
             measurementColumnNames.add(schema.getName());
             measurementSchemas.add(
                 new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
             break;
           case TIME:
-            aggColumnsIndexArray[channel] = -1;
+            newAggColumnsIndexArray.add(-1);
             break;
           default:
             throw new IllegalArgumentException(
@@ -372,40 +368,144 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
         }
 
         aggColumnSchemas.add(schema);
-        aggregatorInputChannels.add(channel);
-        aggColumnLayout.put(columnName, channel++);
+        aggregatorInputChannels.add(newChannelCnt);
+        aggColumnLayout.put(columnName, newChannelCnt++);
       } else {
         aggregatorInputChannels.add(aggColumnLayout.get(columnName));
       }
 
-      aggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
+      newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
     }
   }
 
-  private void processLast(int channelCnt) {
-    ColumnSchema columnSchema = initAggColumnSchemas.get(channelCnt);
-    String columnName = columnSchema.getName();
+  private void processLast(int aggregatorNum, int channelNum) {
+    TableAggregator aggregator = initTableAggregators.get(aggregatorNum);
+    ColumnSchema schema = 
initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum));
+    String columnName = schema.getName();
+    if (schema.getColumnCategory() == TsTableColumnCategory.TIME
+        || schema.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) {
+      TimeValuePair timeValuePair =
+          TABLE_DEVICE_SCHEMA_CACHE.getLastEntry(
+              dbName, currentDeviceEntry.getDeviceID(), columnName);
+
+      if (timeValuePair != null) {
+        ColumnBuilder columnBuilder =
+            resultTsBlockBuilder.getColumnBuilder(groupKeySize + 
aggregatorNum);
+
+        if (timeValuePair == EMPTY_TIME_VALUE_PAIR) {
+          columnBuilder.appendNull();
+        } else {
+          if (aggregator.getStep().isOutputPartial()) {
+            columnBuilder.writeBinary(
+                new Binary(
+                    serializeTimeValue(
+                        timeValuePair.getValue().getDataType(),
+                        timeValuePair.getTimestamp(),
+                        false,
+                        timeValuePair.getValue())));
+          } else {
+            columnBuilder.writeTsPrimitiveType(timeValuePair.getValue());
+          }
+        }
+        return;
+      }
+
+      unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
+    }
+
+    TableAggregator newAggregator =
+        new TableAggregator(
+            aggregator.getAccumulator(),
+            aggregator.getStep(),
+            aggregator.getType(),
+            initLastInputChannels,
+            OptionalInt.empty());
+    tableAggregators.add(newAggregator);
+
+    // last_by always has two channels
+    for (int i = 0; i < 2; i++) {
+      schema = 
initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum + i));
+      columnName = schema.getName();
 
-    if (columnSchema.getColumnCategory() == TsTableColumnCategory.ID
-        || columnSchema.getColumnCategory() == 
TsTableColumnCategory.ATTRIBUTE) {
       if (!aggColumnLayout.containsKey(columnName)) {
-        newColumnsIndexArray.add(initAggColumnsIndexArray[channelCnt]);
+        switch (schema.getColumnCategory()) {
+          case ID:
+          case ATTRIBUTE:
+            newAggColumnsIndexArray.add(initAggColumnsIndexArray[channelNum]);
+            break;
+          case MEASUREMENT:
+            newAggColumnsIndexArray.add(measurementCount);
+            measurementCount++;
+            measurementColumnNames.add(schema.getName());
+            measurementSchemas.add(
+                new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
+            break;
+          case TIME:
+            newAggColumnsIndexArray.add(-1);
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Unexpected category: " + schema.getColumnCategory());
+        }
+
+        aggColumnSchemas.add(schema);
+        aggregatorInputChannels.add(newChannelCnt);
+        aggColumnLayout.put(columnName, newChannelCnt++);
+      } else {
+        aggregatorInputChannels.add(aggColumnLayout.get(columnName));
       }
-    } else if (columnSchema.getColumnCategory() == TsTableColumnCategory.TIME) 
{
-      aggColumnsIndexArray[channel] = -1;
-    } else {
-      // String measurement = initMeasurementColumnNames.get(i);
 
-      aggColumnsIndexArray[channel] = measurementCount;
-      measurementCount++;
-      measurementColumnNames.add(columnSchema.getName());
-      measurementSchemas.add(
-          new MeasurementSchema(columnSchema.getName(), 
getTSDataType(columnSchema.getType())));
+      newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
     }
   }
 
-  List<Integer> newColumnsIndexArray;
-  int channel = 0;
+  private boolean hasUnCachedColumns() {
+    return this.tableAggregators != null && !this.tableAggregators.isEmpty();
+  }
+
+  private void buildAggTableScanArguments() {
+    addLastTimeAggregationToAggregators();
+
+    aggColumnsIndexArray = 
newAggColumnsIndexArray.stream().mapToInt(Integer::intValue).toArray();
+    allSensors = new HashSet<>(measurementColumnNames);
+    allSensors.add("");
+
+    aggTableScanOperator =
+        new TableAggregationTableScanOperator(
+            sourceId,
+            operatorContext,
+            aggColumnSchemas,
+            aggColumnsIndexArray,
+            Collections.singletonList(currentDeviceEntry),
+            seriesScanOptions,
+            measurementColumnNames,
+            allSensors,
+            measurementSchemas,
+            tableAggregators,
+            Collections.emptyList(),
+            null,
+            timeIterator,
+            false,
+            canUseStatistics,
+            aggregatorInputChannels);
+
+    aggTableScanOperator.initQueryDataSource(this.queryDataSource);
+  }
+
+  private void addLastTimeAggregationToAggregators() {
+    // always add last(time) to tail, update last cache need the value of 
last(time)
+    int timeColumnIdx = aggColumnLayout.get("time");
+    tableAggregators.add(
+        new TableAggregator(
+            new LastDescAccumulator(TSDataType.TIMESTAMP),
+            AggregationNode.Step.FINAL,
+            TSDataType.TIMESTAMP,
+            Collections.singletonList(timeColumnIdx),
+            OptionalInt.empty()));
+
+    aggregatorInputChannels.add(timeColumnIdx);
+    aggregatorInputChannels.add(timeColumnIdx);
+  }
 
   private void appendToResultTsBlockBuilder() {
     ColumnBuilder[] columnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
@@ -414,7 +514,7 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
       if (TsTableColumnCategory.ID == 
groupingKeySchemas.get(i).getColumnCategory()) {
         String id =
             (String)
-                
initDeviceEntries.get(lastQueryDeviceIndex).getNthSegment(groupingKeyIndex[i] + 
1);
+                
initDeviceEntries.get(outputDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
         if (id == null) {
           columnBuilders[i].appendNull();
         } else {
@@ -423,7 +523,7 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
       } else {
         Binary attribute =
             initDeviceEntries
-                .get(lastQueryDeviceIndex)
+                .get(outputDeviceIndex)
                 .getAttributeColumnValues()
                 .get(groupingKeyIndex[i]);
         if (attribute == null) {
@@ -436,17 +536,25 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
     resultTsBlockBuilder.declarePosition();
   }
 
-  private void resetArguments() {
-    aggColumnLayout = new HashMap<>();
+  private void resetAggArguments() throws Exception {
+    if (aggTableScanOperator != null) {
+      aggTableScanOperator.close();
+    }
+    aggTableScanOperator = null;
 
-    aggregatorInputChannels = new ArrayList<>();
     tableAggregators = new ArrayList<>();
+    newChannelCnt = 0;
+
+    aggColumnLayout = new HashMap<>();
+    aggColumnSchemas = new ArrayList<>();
+    aggregatorInputChannels = new ArrayList<>();
+    newAggColumnsIndexArray = new ArrayList<>();
 
     measurementColumnNames = new ArrayList<>();
     measurementSchemas = new ArrayList<>();
-    newColumnsIndexArray = new ArrayList<>();
-    channel = 0;
     measurementCount = 0;
+
+    unCachedMeasurementToAggregatorIndex.clear();
   }
 
   private void buildResultTsBlock() {
@@ -466,11 +574,11 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
 
     DeviceEntry deviceEntry;
 
-    if (this.deviceEntries.isEmpty() || 
this.deviceEntries.get(this.lastQueryDeviceIndex) == null) {
+    if (this.deviceEntries.isEmpty() || 
this.deviceEntries.get(this.outputDeviceIndex) == null) {
       // for device which is not exist
       deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), 
Collections.emptyList());
     } else {
-      deviceEntry = this.deviceEntries.get(this.lastQueryDeviceIndex);
+      deviceEntry = this.deviceEntries.get(this.outputDeviceIndex);
     }
 
     AlignedFullPath alignedPath =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
index d06f9413933..2ad6b8dd089 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
@@ -65,6 +65,10 @@ public class LastByAccumulator implements TableAccumulator {
     this.xResult = TsPrimitiveType.getByType(xDataType);
   }
 
+  public boolean yIsTimeColumn() {
+    return this.yIsTimeColumn;
+  }
+
   @Override
   public long getEstimatedSize() {
     return INSTANCE_SIZE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java
index dd63fa9cdd1..7f18f593f14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java
@@ -111,6 +111,10 @@ public class TableAggregator {
     return this.inputChannels;
   }
 
+  public TableAccumulator getAccumulator() {
+    return this.accumulator;
+  }
+
   public AggregationNode.Step getStep() {
     return this.step;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index f2de6f8b7b1..951015be1e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -78,6 +78,8 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Tabl
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableLastQueryOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
@@ -1919,7 +1921,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     context.getDriverContext().setInputDriver(true);
 
-    if (canUseLastCacheOptimize()) {
+    if (canUseLastCacheOptimize(aggregators)) {
       // context add TableLastQueryOperator
       TableLastQueryOperator lastQueryOperator =
           new TableLastQueryOperator(
@@ -2075,8 +2077,14 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new boolean[] {canUseStatistic, isAscending};
   }
 
-  private boolean canUseLastCacheOptimize() {
-    // TODO complete this method
+  private boolean canUseLastCacheOptimize(List<TableAggregator> aggregators) {
+    for (TableAggregator aggregator : aggregators) {
+      if (!(aggregator.getAccumulator() instanceof LastDescAccumulator
+          || (aggregator.getAccumulator() instanceof LastByDescAccumulator
+              && ((LastByDescAccumulator) 
aggregator.getAccumulator()).yIsTimeColumn()))) {
+        return false;
+      }
+    }
     return true;
   }
 }

Reply via email to