This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/last_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04ccdace529f6606877f5f3b676045eafcb6a6b9 Author: Beyyes <[email protected]> AuthorDate: Sat Dec 21 18:33:07 2024 +0800 add impl --- .../source/relational/TableLastQueryOperator.java | 396 +++++++++++++-------- .../relational/aggregation/LastByAccumulator.java | 4 + .../relational/aggregation/TableAggregator.java | 4 + .../plan/planner/TableOperatorGenerator.java | 14 +- 4 files changed, 271 insertions(+), 147 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java index 1489812105d..30aa2933b0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java @@ -24,6 +24,9 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByAccumulator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; @@ -31,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -54,9 +58,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -64,6 +70,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.Utils.serializeTimeValue; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; public class TableLastQueryOperator extends TableAggregationTableScanOperator { @@ -75,40 +82,32 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { TableDeviceSchemaCache.getInstance(); private boolean finished = false; - - private final QualifiedObjectName qualifiedObjectName; - private int lastQueryDeviceIndex; - - private QueryDataSource queryDataSource; - - private SeriesScanOptions initSeriesScanOptions; + private boolean fetchLastCacheForCurrentDevice; + private int outputDeviceIndex; + private DeviceEntry currentDeviceEntry; + private TableAggregationTableScanOperator aggTableScanOperator; + private Map<String, Integer> aggColumnLayout; + private int newChannelCnt = 0; + private List<Integer> newAggColumnsIndexArray; + private final List<Integer> initLastByInputChannels = Arrays.asList(0, -1, -1); + private final List<Integer> initLastInputChannels = Arrays.asList(0, -1); + // the ordinal of uncached columns in initTableAggregators + private List<Integer> unCachedMeasurementToAggregatorIndex = new ArrayList<>(); + private final String dbName; // last_by(x,time) or last(time) - private final boolean[] isLastBy; - private boolean hashLastBy; - private boolean calcCacheForCurrentDevice; + private final boolean[] isLastByArray; + private QueryDataSource queryDataSource; private final List<DeviceEntry> initDeviceEntries; private final List<TableAggregator> initTableAggregators; - private final List<String> initMeasurementColumnNames; - private final Set<String> initAllSensors; - private final List<IMeasurementSchema> initMeasurementSchemas; private final List<ColumnSchema> initAggColumnSchemas; private final int[] initAggColumnsIndexArray; - - // stores all inputChannels of tableAggregators, - // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0] private final List<Integer> initAggregatorInputChannels; - + private final List<String> initMeasurementColumnNames; + private final List<IMeasurementSchema> initMeasurementSchemas; + private SeriesScanOptions initSeriesScanOptions; private final int groupKeySize; - DeviceEntry currentDeviceEntry; - - List<Integer> cacheToAggregatorIndex = new ArrayList<>(); - - TableAggregationTableScanOperator aggTableScanOperator; - - Map<String, Integer> aggColumnLayout; - public TableLastQueryOperator( PlanNodeId sourceId, OperatorContext context, @@ -149,22 +148,20 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { this.initTableAggregators = tableAggregators; this.initAggregatorInputChannels = aggregatorInputChannels; this.initDeviceEntries = deviceEntries; - this.initAggColumnSchemas = aggColumnSchemas; this.initAggColumnsIndexArray = aggColumnsIndexArray; - this.initMeasurementColumnNames = measurementColumnNames; this.initMeasurementSchemas = measurementSchemas; - this.initSeriesScanOptions = seriesScanOptions; - this.initAllSensors = allSensors; - - this.isLastBy = new boolean[tableAggregators.size()]; - // change it later - Arrays.fill(isLastBy, true); - this.qualifiedObjectName = qualifiedObjectName; + this.dbName = qualifiedObjectName.getDatabaseName(); + this.groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); - groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); + this.isLastByArray = new boolean[tableAggregators.size()]; + for (int i = 0; i < tableAggregators.size(); i++) { + if (tableAggregators.get(i).getAccumulator() instanceof LastByAccumulator) { + isLastByArray[i] = true; + } + } } @Override @@ -181,7 +178,7 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { return true; } - return lastQueryDeviceIndex < initDeviceEntries.size(); + return outputDeviceIndex < initDeviceEntries.size(); } @Override @@ -193,9 +190,9 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { return getResultFromRetainedTsBlock(); } - while (System.nanoTime() - start > maxRuntime + while (System.nanoTime() - start < maxRuntime && !resultTsBlockBuilder.isFull() - && lastQueryDeviceIndex < initDeviceEntries.size()) { + && outputDeviceIndex < initDeviceEntries.size()) { processCurrentDevice(); } @@ -209,79 +206,76 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { /** Main process logic, calc the last aggregation results of current device. */ private void processCurrentDevice() throws Exception { + currentDeviceEntry = initDeviceEntries.get(outputDeviceIndex); - // TODO need lock? - currentDeviceEntry = initDeviceEntries.get(lastQueryDeviceIndex); - - if (!calcCacheForCurrentDevice) { - resetArguments(); - - int channelCnt = 0; + if (!fetchLastCacheForCurrentDevice) { + resetAggArguments(); + int initChannel = 0; for (int i = 0; i < initTableAggregators.size(); i++) { TableAggregator aggregator = initTableAggregators.get(i); - if (isLastBy[i]) { - processLastBy(i, channelCnt); + if (isLastByArray[i]) { + processLastBy(i, initChannel); } else { - processLast(channelCnt); + processLast(i, initChannel); } - - channelCnt += aggregator.getChannelCount(); + initChannel += aggregator.getChannelCount(); } - calcCacheForCurrentDevice = true; + fetchLastCacheForCurrentDevice = true; } - if (aggColumnSchemas != null && !aggColumnSchemas.isEmpty()) { - // if (aggTableScanOperator == null) { - // // always add last(time) to tail, update last cache need the value of last(time) - // tableAggregators.add( - // new TableAggregator( - // new LastDescAccumulator(TSDataType.TIMESTAMP), - // tableAggregators.get(0).getStep(), - // TSDataType.TIMESTAMP, - // Arrays.asList(aggColumnLayout.get("time"), aggColumnLayout.get("time")), - // OptionalInt.empty())); - // aggTableScanOperator = - // new TableAggregationTableScanOperator( - // sourceId, - // operatorContext, - // aggColumnSchemas, - // aggColumnsIndexArray, - // Collections.singletonList(currentDeviceEntry), - // seriesScanOptions, - // measurementColumnNames, - // allSensors, - // measurementSchemas, - // tableAggregators, - // Collections.emptyList(), - // null, - // timeIterator, - // false, - // canUseStatistics, - // aggregatorInputChannels); - // } - - // if (calculateAggregationResultForCurrentTimeRange()) { - // - // } - - if (calculateAggregationResultForCurrentTimeRange()) { - // TsBlock updateBlock = aggTableScanOperator.next(); - // if (updateBlock != null && !updateBlock.isEmpty()) { - String dbName = qualifiedObjectName.getDatabaseName(); + if (hasUnCachedColumns()) { + if (aggTableScanOperator == null) { + buildAggTableScanArguments(); + } + + if (!aggTableScanOperator.hasNext()) { + for (int i = 0; i < tableAggregators.size() - 1; i++) { + resultTsBlockBuilder + .getValueColumnBuilders()[groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)] + .appendNull(); + } + outputDeviceIndex++; + fetchLastCacheForCurrentDevice = false; + appendToResultTsBlockBuilder(); + return; + } + TsBlock updateBlock = aggTableScanOperator.next(); + if (updateBlock != null && !updateBlock.isEmpty()) { + int channel = 0; List<String> updateMeasurementList = new ArrayList<>(); List<TimeValuePair> updateTimeValuePairList = new ArrayList<>(); - for (int i = 0; i < tableAggregators.size(); i++) { - // if (!isLastBy[i] || (isLastBy[i] && !updateBlock.getColumn(i).isNull(0))) { - // updateMeasurementList.add(""); - // TimeValuePair tv = - // new TimeValuePair( - // updateBlock.getColumn(0).getLong(0), - // updateBlock.getColumn(i).getTsPrimitiveType(0)); - // updateTimeValuePairList.add(tv); - // } + for (int i = 0; i < tableAggregators.size() - 1; i++) { + if (updateBlock.getColumn(i).isNull(0)) { + resultTsBlockBuilder + .getValueColumnBuilders()[ + groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)] + .appendNull(); + } else { + resultTsBlockBuilder + .getValueColumnBuilders()[ + groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)] + .writeBinary(updateBlock.getColumn(i).getBinary(0)); + } + + boolean isLastBy = + tableAggregators.get(i).getAccumulator() instanceof LastByDescAccumulator; + + if (!isLastBy || !updateBlock.getColumn(i).isNull(0)) { + ColumnSchema schema = aggColumnSchemas.get(aggregatorInputChannels.get(channel)); + if (schema.getColumnCategory() == TsTableColumnCategory.TIME + || schema.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) { + updateMeasurementList.add( + schema.getColumnCategory() == TsTableColumnCategory.TIME ? "" : schema.getName()); + TimeValuePair tv = + new TimeValuePair( + updateBlock.getColumn(tableAggregators.size() - 1).getLong(0), + updateBlock.getColumn(i).getTsPrimitiveType(0)); + updateTimeValuePairList.add(tv); + } + } } String[] updateMeasurementArray = updateMeasurementList.toArray(new String[0]); TimeValuePair[] updateTimeValuePairArray = @@ -295,28 +289,22 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { updateMeasurementArray, updateTimeValuePairArray); - aggTableScanOperator.close(); - aggTableScanOperator = null; - lastQueryDeviceIndex++; + outputDeviceIndex++; + fetchLastCacheForCurrentDevice = false; appendToResultTsBlockBuilder(); - resetArguments(); } } - // } } - private void processLastBy(int aggregatorNum, int initChannel) { + private void processLastBy(int aggregatorNum, int channelNum) { TableAggregator aggregator = initTableAggregators.get(aggregatorNum); - ColumnSchema schema = initAggColumnSchemas.get(initChannel); + ColumnSchema schema = initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum)); String columnName = schema.getName(); if (schema.getColumnCategory() == TsTableColumnCategory.TIME || schema.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) { Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult = TABLE_DEVICE_SCHEMA_CACHE.getLastRow( - qualifiedObjectName.getDatabaseName(), - currentDeviceEntry.getDeviceID(), - "", - Collections.singletonList(columnName)); + dbName, currentDeviceEntry.getDeviceID(), "", Collections.singletonList(columnName)); if (lastByResult.isPresent() && lastByResult.get().getRight()[0] != null) { TsPrimitiveType timeValuePair = lastByResult.get().getRight()[0]; @@ -341,30 +329,38 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { return; } - tableAggregators.add(aggregator); - cacheToAggregatorIndex.add(aggregatorNum); + unCachedMeasurementToAggregatorIndex.add(aggregatorNum); } + TableAggregator newAggregator = + new TableAggregator( + aggregator.getAccumulator(), + aggregator.getStep(), + aggregator.getType(), + initLastByInputChannels, + OptionalInt.empty()); + tableAggregators.add(newAggregator); + // last_by always has three channels for (int i = 0; i < 3; i++) { - schema = initAggColumnSchemas.get(initChannel + i); + schema = initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum + i)); columnName = schema.getName(); if (!aggColumnLayout.containsKey(columnName)) { switch (schema.getColumnCategory()) { case ID: case ATTRIBUTE: - aggColumnsIndexArray[channel] = initAggColumnsIndexArray[initChannel]; + newAggColumnsIndexArray.add(initAggColumnsIndexArray[channelNum]); break; case MEASUREMENT: - aggColumnsIndexArray[channel] = measurementCount; + newAggColumnsIndexArray.add(measurementCount); measurementCount++; measurementColumnNames.add(schema.getName()); measurementSchemas.add( new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); break; case TIME: - aggColumnsIndexArray[channel] = -1; + newAggColumnsIndexArray.add(-1); break; default: throw new IllegalArgumentException( @@ -372,40 +368,144 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { } aggColumnSchemas.add(schema); - aggregatorInputChannels.add(channel); - aggColumnLayout.put(columnName, channel++); + aggregatorInputChannels.add(newChannelCnt); + aggColumnLayout.put(columnName, newChannelCnt++); } else { aggregatorInputChannels.add(aggColumnLayout.get(columnName)); } - aggregator.getInputChannels()[i] = aggColumnLayout.get(columnName); + newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName); } } - private void processLast(int channelCnt) { - ColumnSchema columnSchema = initAggColumnSchemas.get(channelCnt); - String columnName = columnSchema.getName(); + private void processLast(int aggregatorNum, int channelNum) { + TableAggregator aggregator = initTableAggregators.get(aggregatorNum); + ColumnSchema schema = initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum)); + String columnName = schema.getName(); + if (schema.getColumnCategory() == TsTableColumnCategory.TIME + || schema.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) { + TimeValuePair timeValuePair = + TABLE_DEVICE_SCHEMA_CACHE.getLastEntry( + dbName, currentDeviceEntry.getDeviceID(), columnName); + + if (timeValuePair != null) { + ColumnBuilder columnBuilder = + resultTsBlockBuilder.getColumnBuilder(groupKeySize + aggregatorNum); + + if (timeValuePair == EMPTY_TIME_VALUE_PAIR) { + columnBuilder.appendNull(); + } else { + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue( + timeValuePair.getValue().getDataType(), + timeValuePair.getTimestamp(), + false, + timeValuePair.getValue()))); + } else { + columnBuilder.writeTsPrimitiveType(timeValuePair.getValue()); + } + } + return; + } + + unCachedMeasurementToAggregatorIndex.add(aggregatorNum); + } + + TableAggregator newAggregator = + new TableAggregator( + aggregator.getAccumulator(), + aggregator.getStep(), + aggregator.getType(), + initLastInputChannels, + OptionalInt.empty()); + tableAggregators.add(newAggregator); + + // last_by always has two channels + for (int i = 0; i < 2; i++) { + schema = initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum + i)); + columnName = schema.getName(); - if (columnSchema.getColumnCategory() == TsTableColumnCategory.ID - || columnSchema.getColumnCategory() == TsTableColumnCategory.ATTRIBUTE) { if (!aggColumnLayout.containsKey(columnName)) { - newColumnsIndexArray.add(initAggColumnsIndexArray[channelCnt]); + switch (schema.getColumnCategory()) { + case ID: + case ATTRIBUTE: + newAggColumnsIndexArray.add(initAggColumnsIndexArray[channelNum]); + break; + case MEASUREMENT: + newAggColumnsIndexArray.add(measurementCount); + measurementCount++; + measurementColumnNames.add(schema.getName()); + measurementSchemas.add( + new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); + break; + case TIME: + newAggColumnsIndexArray.add(-1); + break; + default: + throw new IllegalArgumentException( + "Unexpected category: " + schema.getColumnCategory()); + } + + aggColumnSchemas.add(schema); + aggregatorInputChannels.add(newChannelCnt); + aggColumnLayout.put(columnName, newChannelCnt++); + } else { + aggregatorInputChannels.add(aggColumnLayout.get(columnName)); } - } else if (columnSchema.getColumnCategory() == TsTableColumnCategory.TIME) { - aggColumnsIndexArray[channel] = -1; - } else { - // String measurement = initMeasurementColumnNames.get(i); - aggColumnsIndexArray[channel] = measurementCount; - measurementCount++; - measurementColumnNames.add(columnSchema.getName()); - measurementSchemas.add( - new MeasurementSchema(columnSchema.getName(), getTSDataType(columnSchema.getType()))); + newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName); } } - List<Integer> newColumnsIndexArray; - int channel = 0; + private boolean hasUnCachedColumns() { + return this.tableAggregators != null && !this.tableAggregators.isEmpty(); + } + + private void buildAggTableScanArguments() { + addLastTimeAggregationToAggregators(); + + aggColumnsIndexArray = newAggColumnsIndexArray.stream().mapToInt(Integer::intValue).toArray(); + allSensors = new HashSet<>(measurementColumnNames); + allSensors.add(""); + + aggTableScanOperator = + new TableAggregationTableScanOperator( + sourceId, + operatorContext, + aggColumnSchemas, + aggColumnsIndexArray, + Collections.singletonList(currentDeviceEntry), + seriesScanOptions, + measurementColumnNames, + allSensors, + measurementSchemas, + tableAggregators, + Collections.emptyList(), + null, + timeIterator, + false, + canUseStatistics, + aggregatorInputChannels); + + aggTableScanOperator.initQueryDataSource(this.queryDataSource); + } + + private void addLastTimeAggregationToAggregators() { + // always add last(time) to tail, update last cache need the value of last(time) + int timeColumnIdx = aggColumnLayout.get("time"); + tableAggregators.add( + new TableAggregator( + new LastDescAccumulator(TSDataType.TIMESTAMP), + AggregationNode.Step.FINAL, + TSDataType.TIMESTAMP, + Collections.singletonList(timeColumnIdx), + OptionalInt.empty())); + + aggregatorInputChannels.add(timeColumnIdx); + aggregatorInputChannels.add(timeColumnIdx); + } private void appendToResultTsBlockBuilder() { ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); @@ -414,7 +514,7 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { if (TsTableColumnCategory.ID == groupingKeySchemas.get(i).getColumnCategory()) { String id = (String) - initDeviceEntries.get(lastQueryDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1); + initDeviceEntries.get(outputDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1); if (id == null) { columnBuilders[i].appendNull(); } else { @@ -423,7 +523,7 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { } else { Binary attribute = initDeviceEntries - .get(lastQueryDeviceIndex) + .get(outputDeviceIndex) .getAttributeColumnValues() .get(groupingKeyIndex[i]); if (attribute == null) { @@ -436,17 +536,25 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { resultTsBlockBuilder.declarePosition(); } - private void resetArguments() { - aggColumnLayout = new HashMap<>(); + private void resetAggArguments() throws Exception { + if (aggTableScanOperator != null) { + aggTableScanOperator.close(); + } + aggTableScanOperator = null; - aggregatorInputChannels = new ArrayList<>(); tableAggregators = new ArrayList<>(); + newChannelCnt = 0; + + aggColumnLayout = new HashMap<>(); + aggColumnSchemas = new ArrayList<>(); + aggregatorInputChannels = new ArrayList<>(); + newAggColumnsIndexArray = new ArrayList<>(); measurementColumnNames = new ArrayList<>(); measurementSchemas = new ArrayList<>(); - newColumnsIndexArray = new ArrayList<>(); - channel = 0; measurementCount = 0; + + unCachedMeasurementToAggregatorIndex.clear(); } private void buildResultTsBlock() { @@ -466,11 +574,11 @@ public class TableLastQueryOperator extends TableAggregationTableScanOperator { DeviceEntry deviceEntry; - if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.lastQueryDeviceIndex) == null) { + if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.outputDeviceIndex) == null) { // for device which is not exist deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList()); } else { - deviceEntry = this.deviceEntries.get(this.lastQueryDeviceIndex); + deviceEntry = this.deviceEntries.get(this.outputDeviceIndex); } AlignedFullPath alignedPath = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java index d06f9413933..2ad6b8dd089 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java @@ -65,6 +65,10 @@ public class LastByAccumulator implements TableAccumulator { this.xResult = TsPrimitiveType.getByType(xDataType); } + public boolean yIsTimeColumn() { + return this.yIsTimeColumn; + } + @Override public long getEstimatedSize() { return INSTANCE_SIZE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java index dd63fa9cdd1..7f18f593f14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAggregator.java @@ -111,6 +111,10 @@ public class TableAggregator { return this.inputChannels; } + public TableAccumulator getAccumulator() { + return this.accumulator; + } + public AggregationNode.Step getStep() { return this.step; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index f2de6f8b7b1..951015be1e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -78,6 +78,8 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.Tabl import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableLastQueryOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator; @@ -1919,7 +1921,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution context.getDriverContext().setInputDriver(true); - if (canUseLastCacheOptimize()) { + if (canUseLastCacheOptimize(aggregators)) { // context add TableLastQueryOperator TableLastQueryOperator lastQueryOperator = new TableLastQueryOperator( @@ -2075,8 +2077,14 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new boolean[] {canUseStatistic, isAscending}; } - private boolean canUseLastCacheOptimize() { - // TODO complete this method + private boolean canUseLastCacheOptimize(List<TableAggregator> aggregators) { + for (TableAggregator aggregator : aggregators) { + if (!(aggregator.getAccumulator() instanceof LastDescAccumulator + || (aggregator.getAccumulator() instanceof LastByDescAccumulator + && ((LastByDescAccumulator) aggregator.getAccumulator()).yIsTimeColumn()))) { + return false; + } + } return true; } }
