This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 89a16f360a3676ce268e0d5adf58f1d0da201c40
Author: Beyyes <[email protected]>
AuthorDate: Tue Dec 17 11:05:07 2024 +0800

    fix
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../TableAggregationTableScanOperator.java         |  15 +-
 .../source/relational/TableLastQueryOperator.java  | 151 +++++++++++++++++++++
 .../plan/planner/OperatorTreeGenerator.java        |   2 +-
 .../plan/planner/TableOperatorGenerator.java       |  14 +-
 5 files changed, 166 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 47707e53170..de55094dab4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -638,7 +638,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 60000_0000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e963963bf7d..b9f197adbfc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -49,7 +49,6 @@ import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.BinaryColumn;
-import org.apache.tsfile.read.common.block.column.LongColumn;
 import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
@@ -68,6 +67,7 @@ import java.util.stream.Collectors;
 import static java.lang.String.format;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
 import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
@@ -82,25 +82,19 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   private final List<ColumnSchema> groupingKeySchemas;
   private final int[] groupingKeyIndex;
 
-  public static final LongColumn TIME_COLUMN_TEMPLATE =
-      new LongColumn(1, Optional.empty(), new long[] {0});
-
   private final List<ColumnSchema> columnSchemas;
-
   private final int[] columnsIndexArray;
 
   private final List<DeviceEntry> deviceEntries;
-
   private final int deviceCount;
+  private int currentDeviceIndex;
 
   private final Ordering scanOrder;
   private final SeriesScanOptions seriesScanOptions;
 
   private final List<String> measurementColumnNames;
   private final Set<String> allSensors;
-
   private final List<IMeasurementSchema> measurementSchemas;
-
   private final List<TSDataType> measurementColumnTSDataTypes;
 
   // TODO calc maxTsBlockLineNum using date_bin
@@ -111,8 +105,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
 
   private QueryDataSource queryDataSource;
 
-  private int currentDeviceIndex;
-
   ITableTimeRangeIterator timeIterator;
 
   private boolean allAggregatorsHasFinalResult = false;
@@ -192,9 +184,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       finished = !hasNextWithTimer();
     }
     return finished;
-
-    //    return (retainedTsBlock == null)
-    //        && (currentDeviceIndex >= deviceCount || 
seriesScanOptions.limitConsumedUp());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
new file mode 100644
index 00000000000..20efe5e7b19
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+
+public class TableLastQueryOperator extends AbstractDataSourceOperator {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(TableLastQueryOperator.class);
+
+  private boolean finished = false;
+  // TODO not need all table aggregators when match last cache
+  private final List<TableAggregator> tableAggregators;
+  private final List<ColumnSchema> groupingKeySchemas;
+
+  private final List<DeviceEntry> deviceEntries;
+  private int currentDeviceIndex;
+
+  public TableLastQueryOperator(
+      List<TableAggregator> tableAggregators,
+      List<ColumnSchema> groupingKeySchemas,
+      List<DeviceEntry> deviceEntries) {
+    this.tableAggregators = tableAggregators;
+    this.groupingKeySchemas = groupingKeySchemas;
+    this.deviceEntries = deviceEntries;
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    if (!finished) {
+      finished = !hasNextWithTimer();
+    }
+    return finished;
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (retainedTsBlock != null) {
+      return true;
+    }
+
+    return currentDeviceIndex < deviceEntries.size();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+
+    if (retainedTsBlock != null) {
+      return getResultFromRetainedTsBlock();
+    }
+
+    while (!resultTsBlockBuilder.isFull()) {
+      if (processFinished() || System.nanoTime() - start > maxRuntime) {
+        break;
+      }
+    }
+
+    if (resultTsBlockBuilder.isEmpty()) {
+      return null;
+    }
+
+    buildResultTsBlock();
+    return checkTsBlockSizeAndGetResult();
+  }
+
+  private boolean processFinished() {
+
+    return true;
+  }
+
+  private void buildResultTsBlock() {
+    resultTsBlock =
+        resultTsBlockBuilder.build(
+            new RunLengthEncodedColumn(
+                TIME_COLUMN_TEMPLATE, 
resultTsBlockBuilder.getPositionCount()));
+    resultTsBlockBuilder.reset();
+  }
+
+  @Override
+  protected List<TSDataType> getResultDataTypes() {
+    int groupingKeySize = groupingKeySchemas != null ? 
groupingKeySchemas.size() : 0;
+    List<TSDataType> resultDataTypes = new ArrayList<>(groupingKeySize + 
tableAggregators.size());
+
+    if (groupingKeySchemas != null) {
+      for (int i = 0; i < groupingKeySchemas.size(); i++) {
+        resultDataTypes.add(TSDataType.STRING);
+      }
+    }
+
+    for (TableAggregator aggregator : tableAggregators) {
+      resultDataTypes.add(aggregator.getType());
+    }
+
+    return resultDataTypes;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // TODO
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 9776f3fee68..e3fcf32221e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -3056,8 +3056,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       final MeasurementPath measurementPath =
           devicePath.concatAsMeasurementPath(measurementList.get(i));
       TimeValuePair timeValuePair = null;
+      context.dataNodeQueryContext.lock();
       try {
-        context.dataNodeQueryContext.lock();
         if (!context.dataNodeQueryContext.unCached(measurementPath)) {
           timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
           if (timeValuePair == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7dcdbe04c95..cbb7e6c7577 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -1755,7 +1755,6 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     int aggregationsCount = node.getAggregations().size();
     List<Integer> aggColumnIndexes = new ArrayList<>();
     int channel = 0;
-    int idx = -1;
     int measurementColumnCount = 0;
     Map<Symbol, Integer> idAndAttributeColumnsIndexMap = 
node.getIdAndAttributeIndexMap();
     Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments();
@@ -1770,7 +1769,6 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       AggregationNode.Aggregation aggregation = entry.getValue();
 
       for (Expression argument : aggregation.getArguments()) {
-        idx++;
         Symbol symbol = Symbol.from(argument);
         ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol), 
symbol + " is null");
         switch (schema.getColumnCategory()) {
@@ -1914,6 +1912,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     Set<String> allSensors = new HashSet<>(measurementColumnNames);
     // for time column
     allSensors.add("");
+
     TableAggregationTableScanOperator aggTableScanOperator =
         new TableAggregationTableScanOperator(
             node.getPlanNodeId(),
@@ -1939,7 +1938,11 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
 
-    for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
+    if (canUseLastCacheOptimize()) {
+      // context add TableLastQueryOperator
+    }
+
+    for (int i = 0; i < node.getDeviceEntries().size(); i++) {
       AlignedFullPath alignedPath =
           constructAlignedPath(
               node.getDeviceEntries().get(i),
@@ -2036,6 +2039,11 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new boolean[] {canUseStatistic, isAscending};
   }
 
+  private boolean canUseLastCacheOptimize() {
+    // TODO complete this method
+    return true;
+  }
+
   public static long calculateMaxAggregationResultSize(
       // List<? extends AggregationDescriptor> aggregationDescriptors,
       // ITimeRangeIterator timeRangeIterator

Reply via email to