This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/last_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 89a16f360a3676ce268e0d5adf58f1d0da201c40 Author: Beyyes <[email protected]> AuthorDate: Tue Dec 17 11:05:07 2024 +0800 fix --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../TableAggregationTableScanOperator.java | 15 +- .../source/relational/TableLastQueryOperator.java | 151 +++++++++++++++++++++ .../plan/planner/OperatorTreeGenerator.java | 2 +- .../plan/planner/TableOperatorGenerator.java | 14 +- 5 files changed, 166 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 47707e53170..de55094dab4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -638,7 +638,7 @@ public class IoTDBConfig { private long cacheFileReaderClearPeriod = 100000; /** the max executing time of query in ms. Unit: millisecond */ - private long queryTimeoutThreshold = 60000; + private long queryTimeoutThreshold = 60000_0000; /** the max time to live of a session in ms. Unit: millisecond */ private int sessionTimeoutThreshold = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index e963963bf7d..b9f197adbfc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -49,7 +49,6 @@ import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.BinaryColumn; -import org.apache.tsfile.read.common.block.column.LongColumn; import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -68,6 +67,7 @@ import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; @@ -82,25 +82,19 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private final List<ColumnSchema> groupingKeySchemas; private final int[] groupingKeyIndex; - public static final LongColumn TIME_COLUMN_TEMPLATE = - new LongColumn(1, Optional.empty(), new long[] {0}); - private final List<ColumnSchema> columnSchemas; - private final int[] columnsIndexArray; private final List<DeviceEntry> deviceEntries; - private final int deviceCount; + private int currentDeviceIndex; private final Ordering scanOrder; private final SeriesScanOptions seriesScanOptions; private final List<String> measurementColumnNames; private final Set<String> allSensors; - private final List<IMeasurementSchema> measurementSchemas; - private final List<TSDataType> measurementColumnTSDataTypes; // TODO calc maxTsBlockLineNum using date_bin @@ -111,8 +105,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private QueryDataSource queryDataSource; - private int currentDeviceIndex; - ITableTimeRangeIterator timeIterator; private boolean allAggregatorsHasFinalResult = false; @@ -192,9 +184,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation finished = !hasNextWithTimer(); } return finished; - - // return (retainedTsBlock == null) - // && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java new file mode 100644 index 00000000000..20efe5e7b19 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +public class TableLastQueryOperator extends AbstractDataSourceOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableLastQueryOperator.class); + + private boolean finished = false; + // TODO not need all table aggregators when match last cache + private final List<TableAggregator> tableAggregators; + private final List<ColumnSchema> groupingKeySchemas; + + private final List<DeviceEntry> deviceEntries; + private int currentDeviceIndex; + + public TableLastQueryOperator( + List<TableAggregator> tableAggregators, + List<ColumnSchema> groupingKeySchemas, + List<DeviceEntry> deviceEntries) { + this.tableAggregators = tableAggregators; + this.groupingKeySchemas = groupingKeySchemas; + this.deviceEntries = deviceEntries; + } + + @Override + public boolean isFinished() throws Exception { + if (!finished) { + finished = !hasNextWithTimer(); + } + return finished; + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + + return currentDeviceIndex < deviceEntries.size(); + } + + @Override + public TsBlock next() throws Exception { + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + if (retainedTsBlock != null) { + return getResultFromRetainedTsBlock(); + } + + while (!resultTsBlockBuilder.isFull()) { + if (processFinished() || System.nanoTime() - start > maxRuntime) { + break; + } + } + + if (resultTsBlockBuilder.isEmpty()) { + return null; + } + + buildResultTsBlock(); + return checkTsBlockSizeAndGetResult(); + } + + private boolean processFinished() { + + return true; + } + + private void buildResultTsBlock() { + resultTsBlock = + resultTsBlockBuilder.build( + new RunLengthEncodedColumn( + TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount())); + resultTsBlockBuilder.reset(); + } + + @Override + protected List<TSDataType> getResultDataTypes() { + int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; + List<TSDataType> resultDataTypes = new ArrayList<>(groupingKeySize + tableAggregators.size()); + + if (groupingKeySchemas != null) { + for (int i = 0; i < groupingKeySchemas.size(); i++) { + resultDataTypes.add(TSDataType.STRING); + } + } + + for (TableAggregator aggregator : tableAggregators) { + resultDataTypes.add(aggregator.getType()); + } + + return resultDataTypes; + } + + @Override + public long calculateMaxPeekMemory() { + // TODO + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 9776f3fee68..e3fcf32221e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -3056,8 +3056,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP final MeasurementPath measurementPath = devicePath.concatAsMeasurementPath(measurementList.get(i)); TimeValuePair timeValuePair = null; + context.dataNodeQueryContext.lock(); try { - context.dataNodeQueryContext.lock(); if (!context.dataNodeQueryContext.unCached(measurementPath)) { timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath); if (timeValuePair == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 7dcdbe04c95..cbb7e6c7577 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -1755,7 +1755,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution int aggregationsCount = node.getAggregations().size(); List<Integer> aggColumnIndexes = new ArrayList<>(); int channel = 0; - int idx = -1; int measurementColumnCount = 0; Map<Symbol, Integer> idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap(); Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments(); @@ -1770,7 +1769,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution AggregationNode.Aggregation aggregation = entry.getValue(); for (Expression argument : aggregation.getArguments()) { - idx++; Symbol symbol = Symbol.from(argument); ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol), symbol + " is null"); switch (schema.getColumnCategory()) { @@ -1914,6 +1912,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution Set<String> allSensors = new HashSet<>(measurementColumnNames); // for time column allSensors.add(""); + TableAggregationTableScanOperator aggTableScanOperator = new TableAggregationTableScanOperator( node.getPlanNodeId(), @@ -1939,7 +1938,11 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); - for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { + if (canUseLastCacheOptimize()) { + // context add TableLastQueryOperator + } + + for (int i = 0; i < node.getDeviceEntries().size(); i++) { AlignedFullPath alignedPath = constructAlignedPath( node.getDeviceEntries().get(i), @@ -2036,6 +2039,11 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new boolean[] {canUseStatistic, isAscending}; } + private boolean canUseLastCacheOptimize() { + // TODO complete this method + return true; + } + public static long calculateMaxAggregationResultSize( // List<? extends AggregationDescriptor> aggregationDescriptors, // ITimeRangeIterator timeRangeIterator
