This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ead283f90bfb60431a0fb7fdfbcdce14916a433b
Author: Beyyes <[email protected]>
AuthorDate: Tue Dec 17 17:46:08 2024 +0800

    add basic impl code
---
 .../TableAggregationTableScanOperator.java         |  26 ++---
 .../source/relational/TableLastQueryOperator.java  | 124 +++++++++++++++++++--
 .../plan/planner/TableOperatorGenerator.java       |  97 +++++++++-------
 3 files changed, 187 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index b9f197adbfc..8eb700770d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -97,11 +97,9 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   private final List<IMeasurementSchema> measurementSchemas;
   private final List<TSDataType> measurementColumnTSDataTypes;
 
-  // TODO calc maxTsBlockLineNum using date_bin
-  private final int maxTsBlockLineNum;
-
-  // for different aggregations aiming to same column, use this variable to 
point to same column
-  private final List<Integer> aggArguments;
+  // stores all inputChannels of tableAggregators,
+  // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels 
should be [0, 1, 0]
+  private final List<Integer> aggregatorInputChannels;
 
   private QueryDataSource queryDataSource;
 
@@ -120,7 +118,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       List<String> measurementColumnNames,
       Set<String> allSensors,
       List<IMeasurementSchema> measurementSchemas,
-      int maxTsBlockLineNum,
       int measurementCount,
       List<TableAggregator> tableAggregators,
       List<ColumnSchema> groupingKeySchemas,
@@ -129,7 +126,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       boolean ascending,
       long maxReturnSize,
       boolean canUseStatistics,
-      List<Integer> aggArguments) {
+      List<Integer> aggregatorInputChannels) {
 
     super(
         sourceId,
@@ -165,7 +162,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
     this.currentDeviceIndex = 0;
     this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
-    this.aggArguments = aggArguments;
+    this.aggregatorInputChannels = aggregatorInputChannels;
     this.timeIterator = tableTimeRangeIterator;
     if (tableTimeRangeIterator.getType()
         == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
@@ -173,7 +170,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     }
 
     this.maxReturnSize = maxReturnSize;
-    this.maxTsBlockLineNum = maxTsBlockLineNum;
 
     constructAlignedSeriesScanUtil();
   }
@@ -409,8 +405,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     }
 
     TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
-    Column[] valueColumns = new Column[aggArguments.size()];
-    for (int idx : aggArguments) {
+    Column[] valueColumns = new Column[aggregatorInputChannels.size()];
+    for (int idx : aggregatorInputChannels) {
       if (valueColumns[idx] != null) {
         continue;
       }
@@ -496,10 +492,13 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         idx++;
 
         TsTableColumnCategory columnSchemaCategory =
-            columnSchemas.get(aggArguments.get(idx)).getColumnCategory();
+            
columnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
         statisticsArray[i] =
             buildStatistics(
-                columnSchemaCategory, timeStatistics, valueStatistics, 
aggArguments.get(idx));
+                columnSchemaCategory,
+                timeStatistics,
+                valueStatistics,
+                aggregatorInputChannels.get(idx));
       }
 
       aggregator.processStatistics(statisticsArray);
@@ -855,7 +854,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     this.queryDataSource = (QueryDataSource) dataSource;
     this.seriesScanUtil.initQueryDataSource(queryDataSource);
     this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
-    this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
index 20efe5e7b19..481e575340c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
@@ -23,38 +23,86 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSou
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 
+import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR;
 
 public class TableLastQueryOperator extends AbstractDataSourceOperator {
 
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(TableLastQueryOperator.class);
 
+  private static final TableDeviceSchemaCache TABLE_DEVICE_SCHEMA_CACHE =
+      TableDeviceSchemaCache.getInstance();
+
   private boolean finished = false;
   // TODO not need all table aggregators when match last cache
   private final List<TableAggregator> tableAggregators;
+  private List<TableAggregator> unCachedTableAggregators;
   private final List<ColumnSchema> groupingKeySchemas;
 
+  private final QualifiedObjectName qualifiedObjectName;
   private final List<DeviceEntry> deviceEntries;
   private int currentDeviceIndex;
+  private final List<String> measurementColumnNames;
+  private final List<IMeasurementSchema> measurementSchemas;
+  private final List<TSDataType> measurementColumnTSDataTypes;
+
+  private QueryDataSource queryDataSource;
+
+  // last_by(x,time) or last(time)
+  private final boolean[] isLastBy;
+  private final List<String> lastColumns;
+  private final List<String> lastByColumns;
+  private final int[] indexOfLastColumnInAggregators;
+  private final int[] indexOfLastByColumnInAggregators;
+  private boolean hashLastBy;
+  private boolean needCacheTimeColumn;
+  private boolean calcCacheForCurrentDevice;
+  private List<String> currentUnCacheMeasurements = new ArrayList<>();
 
   public TableLastQueryOperator(
       List<TableAggregator> tableAggregators,
       List<ColumnSchema> groupingKeySchemas,
-      List<DeviceEntry> deviceEntries) {
+      QualifiedObjectName qualifiedObjectName,
+      List<DeviceEntry> deviceEntries,
+      List<String> measurementColumnNames,
+      List<IMeasurementSchema> measurementSchemas,
+      List<TSDataType> measurementColumnTSDataTypes) {
     this.tableAggregators = tableAggregators;
     this.groupingKeySchemas = groupingKeySchemas;
+    this.qualifiedObjectName = qualifiedObjectName;
     this.deviceEntries = deviceEntries;
+    this.measurementColumnNames = measurementColumnNames;
+    this.measurementSchemas = measurementSchemas;
+    this.measurementColumnTSDataTypes = measurementColumnTSDataTypes;
+    this.isLastBy = new boolean[tableAggregators.size()];
+
+    this.lastColumns = new ArrayList<>();
+    this.lastByColumns = new ArrayList<>();
+    this.indexOfLastColumnInAggregators = new int[lastColumns.size()];
+    this.indexOfLastByColumnInAggregators = new int[lastByColumns.size()];
   }
 
   @Override
@@ -83,10 +131,10 @@ public class TableLastQueryOperator extends 
AbstractDataSourceOperator {
       return getResultFromRetainedTsBlock();
     }
 
-    while (!resultTsBlockBuilder.isFull()) {
-      if (processFinished() || System.nanoTime() - start > maxRuntime) {
-        break;
-      }
+    while (System.nanoTime() - start > maxRuntime
+        && !resultTsBlockBuilder.isFull()
+        && currentDeviceIndex < deviceEntries.size()) {
+      processCurrentDevice();
     }
 
     if (resultTsBlockBuilder.isEmpty()) {
@@ -97,9 +145,64 @@ public class TableLastQueryOperator extends 
AbstractDataSourceOperator {
     return checkTsBlockSizeAndGetResult();
   }
 
-  private boolean processFinished() {
+  /** Main process logic, calc the last aggregation results of current device. 
*/
+  private void processCurrentDevice() {
+    // calc indexes...
+    // consider last(time), last(id), last(attr), last(measurement), 
last_by(xx)
+
+    // TODO need lock?
+    DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
+
+    if (!calcCacheForCurrentDevice) {
+      if (!lastByColumns.isEmpty()) {
+        Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+            TABLE_DEVICE_SCHEMA_CACHE.getLastRow(
+                qualifiedObjectName.getDatabaseName(),
+                currentDeviceEntry.getDeviceID(),
+                "",
+                lastByColumns);
+        if (!lastByResult.isPresent()) {
+          // all missed
+
+        } else {
+
+        }
+        // TODO verify id and attr columns
+      }
+
+      if (!lastColumns.isEmpty()) {
+        for (int i = 0; i < lastColumns.size(); i++) {
+          String measurement = lastByColumns.get(i);
+          TimeValuePair timeValuePair =
+              TABLE_DEVICE_SCHEMA_CACHE.getLastEntry(
+                  qualifiedObjectName.getDatabaseName(),
+                  currentDeviceEntry.getDeviceID(),
+                  measurement);
+          if (timeValuePair == null) {
+            currentUnCacheMeasurements.add(measurement);
+          } else {
+            ColumnBuilder columnBuilder =
+                
resultTsBlockBuilder.getColumnBuilder(indexOfLastColumnInAggregators[i]);
+            if (timeValuePair == EMPTY_TIME_VALUE_PAIR) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.writeTsPrimitiveType(timeValuePair.getValue());
+            }
+          }
+        }
+      }
+
+      calcCacheForCurrentDevice = true;
+    }
+
+    // read last value from File or MemTable, update last cache
+    if (!currentUnCacheMeasurements.isEmpty()) {
+      // TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache();
 
-    return true;
+      // TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists
+
+      // init SeriesOptions
+    }
   }
 
   private void buildResultTsBlock() {
@@ -128,6 +231,13 @@ public class TableLastQueryOperator extends 
AbstractDataSourceOperator {
     return resultDataTypes;
   }
 
+  @Override
+  public void initQueryDataSource(IQueryDataSource dataSource) {
+    this.queryDataSource = (QueryDataSource) dataSource;
+    this.seriesScanUtil.initQueryDataSource(queryDataSource);
+    this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     // TODO
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index cbb7e6c7577..40866a9ce30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -378,21 +378,18 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       }
     }
 
-    SeriesScanOptions.Builder scanOptionsBuilder =
-        node.getTimePredicate().isPresent()
-            ? getSeriesScanOptionsBuilder(context, 
node.getTimePredicate().get())
-            : new SeriesScanOptions.Builder();
-    scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
-    scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-    
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
-    scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
-
-    Expression pushDownPredicate = node.getPushDownPredicate();
-    if (pushDownPredicate != null) {
-      scanOptionsBuilder.withPushDownFilter(
-          convertPredicateToFilter(
-              pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
-    }
+    SeriesScanOptions seriesScanOptions =
+        buildSeriesScanOptions(
+            context,
+            columnSchemaMap,
+            measurementColumnNames,
+            measurementColumnsIndexMap,
+            timeColumnName,
+            node.getTimePredicate(),
+            node.getPushDownLimit(),
+            node.getPushDownOffset(),
+            node.isPushLimitToEachDevice(),
+            node.getPushDownPredicate());
 
     OperatorContext operatorContext =
         context
@@ -421,7 +418,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             columnsIndexArray,
             node.getDeviceEntries(),
             node.getScanOrder(),
-            scanOptionsBuilder.build(),
+            seriesScanOptions,
             measurementColumnNames,
             allSensors,
             measurementSchemas,
@@ -1753,7 +1750,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     int distinctArgumentCount = node.getAssignments().size();
     int aggregationsCount = node.getAggregations().size();
-    List<Integer> aggColumnIndexes = new ArrayList<>();
+    List<Integer> aggregatorInputChannels = new ArrayList<>();
     int channel = 0;
     int measurementColumnCount = 0;
     Map<Symbol, Integer> idAndAttributeColumnsIndexMap = 
node.getIdAndAttributeIndexMap();
@@ -1804,10 +1801,10 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         }
 
         if (!columnLayout.containsKey(symbol)) {
-          aggColumnIndexes.add(channel);
+          aggregatorInputChannels.add(channel);
           columnLayout.put(symbol, channel++);
         } else {
-          aggColumnIndexes.add(columnLayout.get(symbol));
+          aggregatorInputChannels.add(columnLayout.get(symbol));
         }
       }
     }
@@ -1894,25 +1891,21 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 TableAggregationTableScanOperator.class.getSimpleName());
-    SeriesScanOptions.Builder scanOptionsBuilder =
-        node.getTimePredicate().isPresent()
-            ? getSeriesScanOptionsBuilder(context, 
node.getTimePredicate().get())
-            : new SeriesScanOptions.Builder();
-    scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
-    scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-    
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
-    scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
-    Expression pushDownPredicate = node.getPushDownPredicate();
-    if (pushDownPredicate != null) {
-      scanOptionsBuilder.withPushDownFilter(
-          convertPredicateToFilter(
-              pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
-    }
+    SeriesScanOptions seriesScanOptions =
+        buildSeriesScanOptions(
+            context,
+            columnSchemaMap,
+            measurementColumnNames,
+            measurementColumnsIndexMap,
+            timeColumnName,
+            node.getTimePredicate(),
+            node.getPushDownLimit(),
+            node.getPushDownOffset(),
+            node.isPushLimitToEachDevice(),
+            node.getPushDownPredicate());
 
     Set<String> allSensors = new HashSet<>(measurementColumnNames);
-    // for time column
-    allSensors.add("");
-
+    allSensors.add(""); // for time column
     TableAggregationTableScanOperator aggTableScanOperator =
         new TableAggregationTableScanOperator(
             node.getPlanNodeId(),
@@ -1921,11 +1914,10 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             columnsIndexArray,
             node.getDeviceEntries(),
             scanAscending ? Ordering.ASC : Ordering.DESC,
-            scanOptionsBuilder.build(),
+            seriesScanOptions,
             measurementColumnNames,
             allSensors,
             measurementSchemas,
-            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
             measurementColumnCount,
             aggregators,
             groupingKeySchemas,
@@ -1934,7 +1926,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             scanAscending,
             calculateMaxAggregationResultSize(),
             canUseStatistic,
-            aggColumnIndexes);
+            aggregatorInputChannels);
 
     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
 
@@ -1957,6 +1949,33 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return aggTableScanOperator;
   }
 
+  private SeriesScanOptions buildSeriesScanOptions(
+      LocalExecutionPlanContext context,
+      Map<Symbol, ColumnSchema> columnSchemaMap,
+      List<String> measurementColumnNames,
+      Map<String, Integer> measurementColumnsIndexMap,
+      String timeColumnName,
+      Optional<Expression> timePredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice,
+      Expression pushDownPredicate) {
+    SeriesScanOptions.Builder scanOptionsBuilder =
+        timePredicate
+            .map(expression -> getSeriesScanOptionsBuilder(context, 
expression))
+            .orElseGet(SeriesScanOptions.Builder::new);
+    scanOptionsBuilder.withPushDownLimit(pushDownLimit);
+    scanOptionsBuilder.withPushDownOffset(pushDownOffset);
+    scanOptionsBuilder.withPushLimitToEachDevice(pushLimitToEachDevice);
+    scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
+    if (pushDownPredicate != null) {
+      scanOptionsBuilder.withPushDownFilter(
+          convertPredicateToFilter(
+              pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
+    }
+    return scanOptionsBuilder.build();
+  }
+
   @Override
   public Operator visitExplainAnalyze(ExplainAnalyzeNode node, 
LocalExecutionPlanContext context) {
     Operator operator = node.getChild().accept(this, context);

Reply via email to