This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6479ad912d3b7a9a1a671081526dfc1b9a600bf0
Author: Beyyes <[email protected]>
AuthorDate: Fri Oct 25 16:06:11 2024 +0800

    fix it
---
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  |   9 +-
 .../TableAggregationTableScanOperator.java         | 117 +++++++++++----------
 .../aggregation/TableModeAccumulator.java          |   8 +-
 .../plan/planner/TableOperatorGenerator.java       |  28 -----
 4 files changed, 68 insertions(+), 94 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index cc87922666d..c97c0fcab50 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -615,7 +615,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
         retArray,
         DATABASE_NAME);
 
-    expectedHeader = buildHeaders(2);
+    expectedHeader = buildHeaders(3);
     sql =
         "select count(*),count(t1),sum(t1) from (select avg(num+1) as t1 from 
table0 where time < 0)";
     retArray =
@@ -908,15 +908,16 @@ public class IoTDBMultiIDsWithAttributesTableIT {
     tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
 
     // flush multi times, generated multi tsfile
-    expectedHeader = buildHeaders(1);
-    sql = "select date_bin(40ms,time), first(time) from table1 where 
device='d11' group by 1";
+    expectedHeader = buildHeaders(2);
+    sql = "select date_bin(30ms,time), first(time) from table1 where 
device='d11' group by 1";
     retArray =
         new String[] {
           "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,",
-          "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,"
+          "1970-01-01T00:00:00.030Z,1970-01-01T00:00:00.030Z,"
         };
     tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
 
+    expectedHeader = buildHeaders(1);
     sql =
         "select count(*) from ("
             + "select device, level, date_bin(1d, time) as bin, "
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e5b3e2b1b3f..fe086fd9e13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -24,7 +24,7 @@ import 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow;
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
@@ -67,53 +68,54 @@ import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 
-public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregationScanOperator {
+public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperator {
+
+  public static final LongColumn TIME_COLUMN_TEMPLATE =
+      new LongColumn(1, Optional.empty(), new long[] {0});
 
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class);
+  // can not calc maxTsBlockLineNum using date_bin
+  private final int maxTsBlockLineNum =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+  private final long cachedRawDataSize;
 
-  private final List<TableAggregator> tableAggregators;
+  private boolean finished = false;
 
+  private final List<TableAggregator> tableAggregators;
   private final List<ColumnSchema> groupingKeySchemas;
   private final int[] groupingKeyIndex;
-
-  public static final LongColumn TIME_COLUMN_TEMPLATE =
-      new LongColumn(1, Optional.empty(), new long[] {0});
+  // for different aggregations aiming to same column, use this variable to 
point to same column
+  private final List<Integer> aggArguments;
 
   private final List<ColumnSchema> columnSchemas;
-
   private final int[] columnsIndexArray;
 
   private final List<DeviceEntry> deviceEntries;
-
   private final int deviceCount;
-
-  private final Ordering scanOrder;
-  private final SeriesScanOptions seriesScanOptions;
+  private final int measurementCount;
+  private int currentDeviceIndex;
 
   private final List<String> measurementColumnNames;
-
   private final List<IMeasurementSchema> measurementSchemas;
-
   private final List<TSDataType> measurementColumnTSDataTypes;
 
-  // TODO calc maxTsBlockLineNum using date_bin
-  private final int maxTsBlockLineNum;
-
-  // for different aggregations aiming to same column, use this variable to 
point to same column
-  private final List<Integer> aggArguments;
-
+  private final Ordering scanOrder;
+  private final SeriesScanOptions seriesScanOptions;
   private QueryDataSource queryDataSource;
 
-  private int currentDeviceIndex;
-
-  ITableTimeRangeIterator timeIterator;
+  private final ITableTimeRangeIterator timeIterator;
 
+  private final boolean canUseStatistics;
+  private final boolean ascending;
   private boolean allAggregatorsHasFinalResult = false;
+  private long leftRuntimeOfOneNextCall;
+
+  private TsBlock inputTsBlock;
 
   public TableAggregationTableScanOperator(
       PlanNodeId sourceId,
-      OperatorContext context,
+      OperatorContext operatorContext,
       List<ColumnSchema> columnSchemas,
       int[] columnsIndexArray,
       List<DeviceEntry> deviceEntries,
@@ -121,36 +123,25 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       SeriesScanOptions seriesScanOptions,
       List<String> measurementColumnNames,
       List<IMeasurementSchema> measurementSchemas,
-      int maxTsBlockLineNum,
       int measurementCount,
       List<TableAggregator> tableAggregators,
       List<ColumnSchema> groupingKeySchemas,
       int[] groupingKeyIndex,
       ITableTimeRangeIterator tableTimeRangeIterator,
       boolean ascending,
-      long maxReturnSize,
       boolean canUseStatistics,
       List<Integer> aggArguments) {
 
-    super(
-        sourceId,
-        context,
-        null,
-        measurementCount,
-        null,
-        null,
-        ascending,
-        false,
-        null,
-        maxReturnSize,
-        canUseStatistics);
+    this.sourceId = sourceId;
+    this.operatorContext = operatorContext;
+    this.canUseStatistics = canUseStatistics;
+    this.ascending = ascending;
+    this.measurementCount = measurementCount;
 
     this.tableAggregators = tableAggregators;
     this.groupingKeySchemas = groupingKeySchemas;
     this.groupingKeyIndex = groupingKeyIndex;
 
-    this.sourceId = sourceId;
-    this.operatorContext = context;
     this.columnSchemas = columnSchemas;
     this.columnsIndexArray = columnsIndexArray;
     this.deviceEntries = deviceEntries;
@@ -164,13 +155,10 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     this.currentDeviceIndex = 0;
     this.aggArguments = aggArguments;
     this.timeIterator = tableTimeRangeIterator;
-    if (tableTimeRangeIterator.getType()
-        == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
-      curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
-    }
 
-    this.maxReturnSize = maxReturnSize;
-    this.maxTsBlockLineNum = maxTsBlockLineNum;
+    this.maxReturnSize = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    this.cachedRawDataSize =
+        (1L + measurementCount) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 
     constructAlignedSeriesScanUtil();
   }
@@ -279,7 +267,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   }
 
   /** Return true if we have the result of this timeRange. */
-  @Override
+  // @Override
   protected boolean calculateAggregationResultForCurrentTimeRange() {
     try {
       if (calcFromCachedData()) {
@@ -342,14 +330,14 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     }
   }
 
-  @Override
+  // @Override
   protected void updateResultTsBlock() {
     appendAggregationResult(resultTsBlockBuilder, tableAggregators);
     // after appendAggregationResult invoked, aggregators must be cleared
     resetTableAggregators();
   }
 
-  @Override
+  // @Override
   protected boolean calcFromCachedData() {
     return calcUsingRawData(inputTsBlock);
   }
@@ -552,7 +540,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   }
 
   @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
-  @Override
+  // @Override
   public boolean readAndCalcFromFile() throws IOException {
     // start stopwatch
     long start = System.nanoTime();
@@ -575,8 +563,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         if (timeIterator
             .getCurTimeRange()
             .contains(fileTimeStatistics.getStartTime(), 
fileTimeStatistics.getEndTime())) {
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
+          Statistics[] statisticsList = new Statistics[measurementCount];
+          for (int i = 0; i < measurementCount; i++) {
             statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
           }
           calcFromStatistics(fileTimeStatistics, statisticsList);
@@ -622,8 +610,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
             .getCurTimeRange()
             .contains(chunkTimeStatistics.getStartTime(), 
chunkTimeStatistics.getEndTime())) {
           // calc from chunkMetaData
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
+          Statistics[] statisticsList = new Statistics[measurementCount];
+          for (int i = 0; i < measurementCount; i++) {
             statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
           }
           calcFromStatistics(chunkTimeStatistics, statisticsList);
@@ -644,8 +632,6 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     return false;
   }
 
-  long leftRuntimeOfOneNextCall = Long.MAX_VALUE;
-
   @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
   protected boolean readAndCalcFromPage() throws IOException {
     long start = System.nanoTime();
@@ -671,8 +657,8 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
           if (timeIterator
               .getCurTimeRange()
               .contains(pageTimeStatistics.getStartTime(), 
pageTimeStatistics.getEndTime())) {
-            Statistics[] statisticsList = new Statistics[subSensorSize];
-            for (int i = 0; i < subSensorSize; i++) {
+            Statistics[] statisticsList = new Statistics[measurementCount];
+            for (int i = 0; i < measurementCount; i++) {
               statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
             }
             calcFromStatistics(pageTimeStatistics, statisticsList);
@@ -863,4 +849,21 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes())
         + RamUsageEstimator.sizeOfCollection(deviceEntries);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
+        ? cachedRawDataSize
+        : 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java
index 1ad01095fac..38d7c2372c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java
@@ -466,9 +466,7 @@ public class TableModeAccumulator implements 
TableAccumulator {
     for (int i = 0; i < column.getPositionCount(); i++) {
       if (!column.isNull(i)) {
         booleanCountMap.compute(column.getBoolean(i), (k, v) -> v == null ? 1 
: v + 1);
-        if (booleanCountMap.size() > MAP_SIZE_THRESHOLD) {
-          checkMapSize(booleanCountMap.size());
-        }
+        checkMapSize(booleanCountMap.size());
       } else {
         nullCount++;
       }
@@ -531,11 +529,11 @@ public class TableModeAccumulator implements 
TableAccumulator {
   }
 
   private void checkMapSize(int size) {
-    if (size > MAP_SIZE_THRESHOLD) {
+    if (size > 
IoTDBDescriptor.getInstance().getConfig().getModeMapSizeThreshold()) {
       throw new RuntimeException(
           String.format(
               "distinct values has exceeded the threshold %s when calculate 
Mode",
-              MAP_SIZE_THRESHOLD));
+              
IoTDBDescriptor.getInstance().getConfig().getModeMapSizeThreshold()));
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 56d6868a24b..bcf320fe76d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -1643,14 +1643,12 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             scanOptionsBuilder.build(),
             measurementColumnNames,
             measurementSchemas,
-            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
             measurementColumnCount,
             aggregators,
             groupingKeySchemas,
             groupingKeyIndex,
             timeRangeIterator,
             scanAscending,
-            calculateMaxAggregationResultSize(),
             canUseStatistic,
             aggColumnIndexes);
 
@@ -1734,30 +1732,4 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     }
     return new boolean[] {canUseStatistic, isAscending};
   }
-
-  public static long calculateMaxAggregationResultSize(
-      // List<? extends AggregationDescriptor> aggregationDescriptors,
-      // ITimeRangeIterator timeRangeIterator
-      ) {
-    // TODO perfect max aggregation result size logic
-    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
-    //    long timeValueColumnsSizePerLine = 
TimeColumn.SIZE_IN_BYTES_PER_POSITION;
-    //    for (AggregationDescriptor descriptor : aggregationDescriptors) {
-    //      List<TSDataType> outPutDataTypes =
-    //              descriptor.getOutputColumnNames().stream()
-    //                      .map(typeProvider::getTableModelType)
-    //                      .collect(Collectors.toList());
-    //      for (TSDataType tsDataType : outPutDataTypes) {
-    //        timeValueColumnsSizePerLine += 
getOutputColumnSizePerLine(tsDataType);
-    //      }
-    //    }
-    //
-    //    return Math.min(
-    //            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
-    //            Math.min(
-    //                    
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
-    //                    timeRangeIterator.getTotalIntervalNum())
-    //                    * timeValueColumnsSizePerLine);
-  }
 }

Reply via email to