This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/last_cache by this push:
     new 152396c9c3d fix groupKeySize + 
unCachedMeasurementToAggregatorIndex.get(i) in appendAggregationResult
152396c9c3d is described below

commit 152396c9c3d4b85a69e8de45fab8b268bb598ab6
Author: Beyyes <[email protected]>
AuthorDate: Mon Dec 23 15:57:48 2024 +0800

    fix groupKeySize + unCachedMeasurementToAggregatorIndex.get(i) in 
appendAggregationResult
---
 .../TableAggregationTableScanOperator.java         |   7 +-
 .../source/relational/TableLastQueryOperator.java  | 365 ++++++++++-----------
 .../relational/aggregation/LastAccumulator.java    |  12 +
 .../relational/aggregation/LastByAccumulator.java  |  20 +-
 .../aggregation/LastByDescAccumulator.java         |   5 +
 .../aggregation/LastDescAccumulator.java           |   5 +
 .../plan/planner/TableOperatorGenerator.java       |   7 +
 7 files changed, 233 insertions(+), 188 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index bfeba58b265..1696db75e74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -86,7 +86,7 @@ public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperato
 
   protected final List<DeviceEntry> deviceEntries;
   protected final int deviceCount;
-  private int currentDeviceIndex;
+  protected int currentDeviceIndex;
   protected List<String> measurementColumnNames;
   protected Set<String> allSensors;
   protected List<IMeasurementSchema> measurementSchemas;
@@ -121,6 +121,7 @@ public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperato
       List<ColumnSchema> aggColumnSchemas,
       int[] aggColumnsIndexArray,
       List<DeviceEntry> deviceEntries,
+      int deviceCount,
       SeriesScanOptions seriesScanOptions,
       List<String> measurementColumnNames,
       Set<String> allSensors,
@@ -142,7 +143,7 @@ public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperato
     this.aggColumnSchemas = aggColumnSchemas;
     this.aggColumnsIndexArray = aggColumnsIndexArray;
     this.deviceEntries = deviceEntries;
-    this.deviceCount = deviceEntries.size();
+    this.deviceCount = deviceCount;
     this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
     this.ascending = ascending;
     this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
@@ -822,7 +823,7 @@ public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperato
         CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
   }
 
-  private void resetTableAggregators() {
+  protected void resetTableAggregators() {
     tableAggregators.forEach(TableAggregator::reset);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
index 7e5e6abdd32..b9ef991ecb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
@@ -47,6 +47,7 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
@@ -79,9 +80,9 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
 
   private boolean finished = false;
   private boolean fetchLastCacheForCurrentDevice;
+  private boolean hasBuildAggTableScanArguments;
   private int outputDeviceIndex;
   private DeviceEntry currentDeviceEntry;
-  private TableAggregationTableScanOperator aggTableScanOperator;
   private Map<String, Integer> aggColumnLayout;
   private int newChannelCnt = 0;
   private List<Integer> newAggColumnsIndexArray;
@@ -129,6 +130,7 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
         null,
         null,
         new ArrayList<>(1),
+        1,
         seriesScanOptions,
         Collections.emptyList(),
         null,
@@ -201,99 +203,97 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
   }
 
   /** Main process logic, calc the last aggregation results of current device. 
*/
-  private void processCurrentDevice() throws Exception {
+  private void processCurrentDevice() {
     currentDeviceEntry = initDeviceEntries.get(outputDeviceIndex);
 
     if (!fetchLastCacheForCurrentDevice) {
       resetAggArguments();
 
-      int initChannel = 0;
-      for (int i = 0; i < initTableAggregators.size(); i++) {
-        TableAggregator aggregator = initTableAggregators.get(i);
-        if (isLastByArray[i]) {
-          processLastBy(i, initChannel);
+      int channelNum = 0;
+      for (int aggregatorNum = 0; aggregatorNum < initTableAggregators.size(); 
aggregatorNum++) {
+        TableAggregator aggregator = initTableAggregators.get(aggregatorNum);
+        if (isLastByArray[aggregatorNum]) {
+          processLastBy(aggregatorNum, channelNum);
         } else {
-          processLast(i, initChannel);
+          processLast(aggregatorNum, channelNum);
         }
-        initChannel += aggregator.getChannelCount();
+        channelNum += aggregator.getChannelCount();
       }
 
       fetchLastCacheForCurrentDevice = true;
     }
 
     if (hasUnCachedColumns()) {
-      if (aggTableScanOperator == null) {
+      if (!hasBuildAggTableScanArguments) {
         buildAggTableScanArguments();
       }
 
-      //      if (!aggTableScanOperator.hasNext()) {
-      //        for (int i = 0; i < tableAggregators.size() - 1; i++) {
-      //          resultTsBlockBuilder
-      //              .getValueColumnBuilders()[groupKeySize +
-      // unCachedMeasurementToAggregatorIndex.get(i)]
-      //              .appendNull();
-      //        }
-      //        outputDeviceIndex++;
-      //        fetchLastCacheForCurrentDevice = false;
-      //        appendToResultTsBlockBuilder();
-      //        return;
-      //      }
-
-      //      TsBlock updateBlock = aggTableScanOperator.next();
       if (calculateAggregationResultForCurrentTimeRange()) {
         int channel = 0;
         List<String> updateMeasurementList = new ArrayList<>();
         List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
-        for (int i = 0; i < tableAggregators.size(); i++) {
-          ColumnBuilder valueColumnBuilder =
-              resultTsBlockBuilder
-                  .getValueColumnBuilders()[
-                  groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)];
-          //          if (updateBlock.getColumn(i).isNull(0)) {
-          //            valueColumnBuilder.appendNull();
-          //          } else {
-          //            valueColumnBuilder
-          //                
.writeBinary(updateBlock.getColumn(i).getBinary(0));
-          tableAggregators.get(i).evaluate(valueColumnBuilder);
-          //          }
-
-          boolean isLastBy =
-              tableAggregators.get(i).getAccumulator() instanceof 
LastByDescAccumulator;
-
-          //          if (!isLastBy || !updateBlock.getColumn(i).isNull(0)) {
-          //            ColumnSchema schema =
-          // aggColumnSchemas.get(aggregatorInputChannels.get(channel));
-          //            if (schema.getColumnCategory() == 
TsTableColumnCategory.TIME
-          //                || schema.getColumnCategory() == 
TsTableColumnCategory.MEASUREMENT) {
-          //              updateMeasurementList.add(
-          //                  schema.getColumnCategory() == 
TsTableColumnCategory.TIME ? "" :
-          // schema.getName());
-          //              TimeValuePair tv =
-          //                  new TimeValuePair(
-          //
-          // (LastDescAccumulator)(tableAggregators.get(i).getAccumulator()).
-          //                      
updateBlock.getColumn(tableAggregators.size() - 1).getLong(0),
-          //                      
updateBlock.getColumn(i).getTsPrimitiveType(0));
-          //                updateTimeValuePairList.add(tv);
-          //            }
-          //          }
+        for (TableAggregator tableAggregator : tableAggregators) {
+          ColumnSchema schema = 
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+          if (schema.getColumnCategory() != TsTableColumnCategory.TIME
+              && schema.getColumnCategory() != 
TsTableColumnCategory.MEASUREMENT) {
+            // only time and measurement column can update last cache
+            continue;
+          }
+
+          boolean isLastBy = tableAggregator.getAccumulator() instanceof 
LastByDescAccumulator;
+          if (!isLastBy) {
+            LastDescAccumulator lastAccumulator =
+                (LastDescAccumulator) tableAggregator.getAccumulator();
+            if (lastAccumulator.hasInitResult()) {
+              updateMeasurementList.add(
+                  schema.getColumnCategory() == TsTableColumnCategory.TIME ? 
"" : schema.getName());
+              TimeValuePair tv =
+                  new TimeValuePair(
+                      lastAccumulator.getMaxTime(),
+                      cloneTsPrimitiveType(lastAccumulator.getLastValue()));
+              updateTimeValuePairList.add(tv);
+            }
+          } else {
+            // last_by return non-null value
+            LastByDescAccumulator lastByAccumulator =
+                (LastByDescAccumulator) tableAggregator.getAccumulator();
+            if (lastByAccumulator.hasInitResult()) {
+              updateMeasurementList.add(
+                  schema.getColumnCategory() == TsTableColumnCategory.TIME ? 
"" : schema.getName());
+              long lastTime = lastByAccumulator.getLastTimeOfY();
+              TimeValuePair tv =
+                  lastByAccumulator.isXNull()
+                      ? EMPTY_TIME_VALUE_PAIR
+                      : new TimeValuePair(
+                          lastTime, 
cloneTsPrimitiveType(lastByAccumulator.getXResult()));
+              updateTimeValuePairList.add(tv);
+            }
+          }
+        }
+
+        if (!updateMeasurementList.isEmpty()) {
+          String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
+          TimeValuePair[] updateTimeValuePairArray =
+              updateTimeValuePairList.toArray(new TimeValuePair[0]);
+
+          TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
+              dbName, currentDeviceEntry.getDeviceID(), 
updateMeasurementArray, false);
+          TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+              dbName,
+              currentDeviceEntry.getDeviceID(),
+              updateMeasurementArray,
+              updateTimeValuePairArray);
         }
-        String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
-        TimeValuePair[] updateTimeValuePairArray =
-            updateTimeValuePairList.toArray(new TimeValuePair[0]);
-
-        //        TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
-        //            dbName, currentDeviceEntry.getDeviceID(), 
updateMeasurementArray, false);
-        //        TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
-        //            dbName,
-        //            currentDeviceEntry.getDeviceID(),
-        //            updateMeasurementArray,
-        //            updateTimeValuePairArray);
 
         outputDeviceIndex++;
         fetchLastCacheForCurrentDevice = false;
-        appendToResultTsBlockBuilder();
+        appendGroupByToResult();
+        resetTableAggregators();
       }
+    } else {
+      outputDeviceIndex++;
+      fetchLastCacheForCurrentDevice = false;
+      resultTsBlockBuilder.declarePosition();
     }
   }
 
@@ -307,11 +307,10 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
           TABLE_DEVICE_SCHEMA_CACHE.getLastRow(
               dbName, currentDeviceEntry.getDeviceID(), "", 
Collections.singletonList(columnName));
 
-      if (lastByResult.isPresent() && lastByResult.get().getRight()[0] != 
null) {
-        TsPrimitiveType timeValuePair = lastByResult.get().getRight()[0];
+      if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
         ColumnBuilder columnBuilder =
             resultTsBlockBuilder.getColumnBuilder(groupKeySize + 
aggregatorNum);
-
+        TsPrimitiveType timeValuePair = lastByResult.get().getRight()[0];
         if (timeValuePair == null) {
           columnBuilder.appendNull();
         } else {
@@ -327,12 +326,12 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
             columnBuilder.writeTsPrimitiveType(timeValuePair);
           }
         }
+
         return;
       }
     }
 
     unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
-
     TableAggregator newAggregator =
         new TableAggregator(
             aggregator.getAccumulator(),
@@ -344,39 +343,7 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
 
     // last_by always has three channels
     for (int i = 0; i < 3; i++) {
-      int aggIdx = initAggregatorInputChannels.get(channelNum + i);
-      schema = initAggColumnSchemas.get(aggIdx);
-      columnName = schema.getName();
-
-      if (!aggColumnLayout.containsKey(columnName)) {
-        switch (schema.getColumnCategory()) {
-          case ID:
-          case ATTRIBUTE:
-            newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
-            break;
-          case MEASUREMENT:
-            newAggColumnsIndexArray.add(measurementCount);
-            measurementCount++;
-            measurementColumnNames.add(schema.getName());
-            measurementSchemas.add(
-                new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
-            break;
-          case TIME:
-            newAggColumnsIndexArray.add(-1);
-            break;
-          default:
-            throw new IllegalArgumentException(
-                "Unexpected category: " + schema.getColumnCategory());
-        }
-
-        aggColumnSchemas.add(schema);
-        aggregatorInputChannels.add(newChannelCnt);
-        aggColumnLayout.put(columnName, newChannelCnt++);
-      } else {
-        aggregatorInputChannels.add(aggColumnLayout.get(columnName));
-      }
-
-      newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
+      buildNewAggregators(channelNum, newAggregator, i);
     }
   }
 
@@ -409,12 +376,12 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
             columnBuilder.writeTsPrimitiveType(timeValuePair.getValue());
           }
         }
+
         return;
       }
     }
 
     unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
-
     TableAggregator newAggregator =
         new TableAggregator(
             aggregator.getAccumulator(),
@@ -426,40 +393,45 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
 
     // last_by always has two channels
     for (int i = 0; i < 2; i++) {
-      int aggIdx = initAggregatorInputChannels.get(channelNum + i);
-      schema = initAggColumnSchemas.get(aggIdx);
-      columnName = schema.getName();
-
-      if (!aggColumnLayout.containsKey(columnName)) {
-        switch (schema.getColumnCategory()) {
-          case ID:
-          case ATTRIBUTE:
-            newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
-            break;
-          case MEASUREMENT:
-            newAggColumnsIndexArray.add(measurementCount);
-            measurementCount++;
-            measurementColumnNames.add(schema.getName());
-            measurementSchemas.add(
-                new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
-            break;
-          case TIME:
-            newAggColumnsIndexArray.add(-1);
-            break;
-          default:
-            throw new IllegalArgumentException(
-                "Unexpected category: " + schema.getColumnCategory());
-        }
+      buildNewAggregators(channelNum, newAggregator, i);
+    }
+  }
 
-        aggColumnSchemas.add(schema);
-        aggregatorInputChannels.add(newChannelCnt);
-        aggColumnLayout.put(columnName, newChannelCnt++);
-      } else {
-        aggregatorInputChannels.add(aggColumnLayout.get(columnName));
+  private void buildNewAggregators(int channelNum, TableAggregator 
newAggregator, int i) {
+    ColumnSchema schema;
+    String columnName;
+    int aggIdx = initAggregatorInputChannels.get(channelNum + i);
+    schema = initAggColumnSchemas.get(aggIdx);
+    columnName = schema.getName();
+
+    if (!aggColumnLayout.containsKey(columnName)) {
+      switch (schema.getColumnCategory()) {
+        case ID:
+        case ATTRIBUTE:
+          newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
+          break;
+        case MEASUREMENT:
+          newAggColumnsIndexArray.add(measurementCount);
+          measurementCount++;
+          measurementColumnNames.add(schema.getName());
+          measurementSchemas.add(
+              new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
+          break;
+        case TIME:
+          newAggColumnsIndexArray.add(-1);
+          break;
+        default:
+          throw new IllegalArgumentException("Unexpected category: " + 
schema.getColumnCategory());
       }
 
-      newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
+      aggColumnSchemas.add(schema);
+      aggregatorInputChannels.add(newChannelCnt);
+      aggColumnLayout.put(columnName, newChannelCnt++);
+    } else {
+      aggregatorInputChannels.add(aggColumnLayout.get(columnName));
     }
+
+    newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
   }
 
   private boolean hasUnCachedColumns() {
@@ -483,31 +455,14 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
     //            seriesScanOptions.getPushLimitToEachDevice());
     deviceEntries.clear();
     deviceEntries.add(currentDeviceEntry);
+    currentDeviceIndex = 0;
 
     this.measurementColumnTSDataTypes =
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
     constructAlignedSeriesScanUtil();
     this.seriesScanUtil.initQueryDataSource(queryDataSource);
 
-    aggTableScanOperator =
-        new TableAggregationTableScanOperator(
-            sourceId,
-            operatorContext,
-            aggColumnSchemas,
-            aggColumnsIndexArray,
-            Collections.singletonList(currentDeviceEntry),
-            seriesScanOptions,
-            measurementColumnNames,
-            allSensors,
-            measurementSchemas,
-            tableAggregators,
-            Collections.emptyList(),
-            null,
-            timeIterator,
-            false,
-            canUseStatistics,
-            aggregatorInputChannels);
-
+    this.hasBuildAggTableScanArguments = true;
     // aggTableScanOperator.initQueryDataSource(this.queryDataSource);
   }
 
@@ -528,45 +483,63 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
 
   @Override
   protected void updateResultTsBlock() {
-    // appendAggregationResult(resultTsBlockBuilder, tableAggregators);
+    appendAggregationResult(resultTsBlockBuilder, tableAggregators);
     // after appendAggregationResult invoked, aggregators must be cleared
     // resetTableAggregators();
   }
 
-  private void appendToResultTsBlockBuilder() {
+  @Override
+  /** Append a row of aggregation results to the result tsBlock. */
+  public void appendAggregationResult(
+      TsBlockBuilder tsBlockBuilder, List<? extends TableAggregator> 
aggregators) {
+
+    // no data in current time range, just output empty
+    if (!timeIterator.hasCachedTimeRange()) {
+      return;
+    }
+
+    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+    for (int i = 0; i < aggregators.size(); i++) {
+      aggregators
+          .get(i)
+          .evaluate(columnBuilders[groupKeySize + 
unCachedMeasurementToAggregatorIndex.get(i)]);
+    }
+
+    tsBlockBuilder.declarePosition();
+  }
+
+  private void appendGroupByToResult() {
     ColumnBuilder[] columnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
 
-    for (int i = 0; i < groupKeySize; i++) {
-      if (TsTableColumnCategory.ID == 
groupingKeySchemas.get(i).getColumnCategory()) {
-        String id =
-            (String)
-                
initDeviceEntries.get(outputDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
-        if (id == null) {
-          columnBuilders[i].appendNull();
-        } else {
-          columnBuilders[i].writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
-        }
-      } else {
-        Binary attribute =
-            initDeviceEntries
-                .get(outputDeviceIndex)
-                .getAttributeColumnValues()
-                .get(groupingKeyIndex[i]);
-        if (attribute == null) {
-          columnBuilders[i].appendNull();
+    if (groupingKeyIndex != null) {
+      for (int i = 0; i < groupKeySize; i++) {
+        if (TsTableColumnCategory.ID == 
groupingKeySchemas.get(i).getColumnCategory()) {
+          String id =
+              (String) 
deviceEntries.get(currentDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
+          if (id == null) {
+            columnBuilders[i].appendNull();
+          } else {
+            columnBuilders[i].writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+          }
         } else {
-          columnBuilders[i].writeBinary(attribute);
+          Binary attribute =
+              deviceEntries
+                  .get(currentDeviceIndex)
+                  .getAttributeColumnValues()
+                  .get(groupingKeyIndex[i]);
+          if (attribute == null) {
+            columnBuilders[i].appendNull();
+          } else {
+            columnBuilders[i].writeBinary(attribute);
+          }
         }
       }
     }
-    resultTsBlockBuilder.declarePosition();
   }
 
-  private void resetAggArguments() throws Exception {
-    if (aggTableScanOperator != null) {
-      aggTableScanOperator.close();
-    }
-    aggTableScanOperator = null;
+  private void resetAggArguments() {
+    hasBuildAggTableScanArguments = false;
 
     tableAggregators = new ArrayList<>();
     newChannelCnt = 0;
@@ -591,6 +564,32 @@ public class TableLastQueryOperator extends 
TableAggregationTableScanOperator {
     resultTsBlockBuilder.reset();
   }
 
+  private TsPrimitiveType cloneTsPrimitiveType(TsPrimitiveType originalValue) {
+    switch (originalValue.getDataType()) {
+      case BOOLEAN:
+        return new TsPrimitiveType.TsBoolean(originalValue.getBoolean());
+      case INT32:
+      case DATE:
+        return new TsPrimitiveType.TsInt(originalValue.getInt());
+      case INT64:
+      case TIMESTAMP:
+        return new TsPrimitiveType.TsLong(originalValue.getLong());
+      case FLOAT:
+        return new TsPrimitiveType.TsFloat(originalValue.getFloat());
+      case DOUBLE:
+        return new TsPrimitiveType.TsDouble(originalValue.getDouble());
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return new TsPrimitiveType.TsBinary(originalValue.getBinary());
+      case VECTOR:
+        return new TsPrimitiveType.TsVector(originalValue.getVector());
+      default:
+        throw new UnSupportedDataTypeException(
+            "Unsupported data type:" + originalValue.getDataType());
+    }
+  }
+
   //  @Override
   //  protected void constructAlignedSeriesScanUtil() {
   //    // TODO?
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
index ab9f876d7e3..c29d5ec6faf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
@@ -49,6 +49,18 @@ public class LastAccumulator implements TableAccumulator {
     lastValue = TsPrimitiveType.getByType(seriesDataType);
   }
 
+  public boolean hasInitResult() {
+    return this.initResult;
+  }
+
+  public long getMaxTime() {
+    return this.maxTime;
+  }
+
+  public TsPrimitiveType getLastValue() {
+    return this.lastValue;
+  }
+
   @Override
   public long getEstimatedSize() {
     return INSTANCE_SIZE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
index 2ad6b8dd089..1605f7d6d3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
@@ -45,8 +45,8 @@ public class LastByAccumulator implements TableAccumulator {
   protected final TSDataType xDataType;
   protected final TSDataType yDataType;
 
-  private final boolean xIsTimeColumn;
-  private final boolean yIsTimeColumn;
+  protected final boolean xIsTimeColumn;
+  protected final boolean yIsTimeColumn;
 
   private long yLastTime = Long.MIN_VALUE;
 
@@ -69,6 +69,22 @@ public class LastByAccumulator implements TableAccumulator {
     return this.yIsTimeColumn;
   }
 
+  public boolean hasInitResult() {
+    return this.initResult;
+  }
+
+  public long getLastTimeOfY() {
+    return this.yLastTime;
+  }
+
+  public boolean isXNull() {
+    return xIsNull;
+  }
+
+  public TsPrimitiveType getXResult() {
+    return this.xResult;
+  }
+
   @Override
   public long getEstimatedSize() {
     return INSTANCE_SIZE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
index a44b41d6223..d6e81aacc96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
@@ -29,6 +29,11 @@ public class LastByDescAccumulator extends LastByAccumulator 
{
     super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
   }
 
+  @Override
+  public TableAccumulator copy() {
+    return new LastByDescAccumulator(xDataType, yDataType, xIsTimeColumn, 
yIsTimeColumn);
+  }
+
   @Override
   public boolean hasFinalResult() {
     return initResult;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
index bc0e97f9b7b..0fad4cec0db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
@@ -28,6 +28,11 @@ public class LastDescAccumulator extends LastAccumulator {
     super(seriesDataType);
   }
 
+  @Override
+  public TableAccumulator copy() {
+    return new LastDescAccumulator(seriesDataType);
+  }
+
   @Override
   public boolean hasFinalResult() {
     return initResult;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 951015be1e9..1bb0161e665 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.planner;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.schema.column.ColumnHeader;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -1952,6 +1953,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
               aggColumnSchemas,
               aggColumnsIndexArray,
               node.getDeviceEntries(),
+              node.getDeviceEntries().size(),
               seriesScanOptions,
               measurementColumnNames,
               allSensors,
@@ -2078,6 +2080,10 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
   }
 
   private boolean canUseLastCacheOptimize(List<TableAggregator> aggregators) {
+    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && 
aggregators.isEmpty()) {
+      return false;
+    }
+
     for (TableAggregator aggregator : aggregators) {
       if (!(aggregator.getAccumulator() instanceof LastDescAccumulator
           || (aggregator.getAccumulator() instanceof LastByDescAccumulator
@@ -2085,6 +2091,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         return false;
       }
     }
+
     return true;
   }
 }

Reply via email to