This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/last_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ead283f90bfb60431a0fb7fdfbcdce14916a433b Author: Beyyes <[email protected]> AuthorDate: Tue Dec 17 17:46:08 2024 +0800 add basic impl code --- .../TableAggregationTableScanOperator.java | 26 ++--- .../source/relational/TableLastQueryOperator.java | 124 +++++++++++++++++++-- .../plan/planner/TableOperatorGenerator.java | 97 +++++++++------- 3 files changed, 187 insertions(+), 60 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index b9f197adbfc..8eb700770d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -97,11 +97,9 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private final List<IMeasurementSchema> measurementSchemas; private final List<TSDataType> measurementColumnTSDataTypes; - // TODO calc maxTsBlockLineNum using date_bin - private final int maxTsBlockLineNum; - - // for different aggregations aiming to same column, use this variable to point to same column - private final List<Integer> aggArguments; + // stores all inputChannels of tableAggregators, + // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0] + private final List<Integer> aggregatorInputChannels; private QueryDataSource queryDataSource; @@ -120,7 +118,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation List<String> measurementColumnNames, Set<String> allSensors, List<IMeasurementSchema> measurementSchemas, - int maxTsBlockLineNum, int measurementCount, List<TableAggregator> tableAggregators, List<ColumnSchema> groupingKeySchemas, @@ -129,7 +126,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation boolean ascending, long maxReturnSize, boolean canUseStatistics, - List<Integer> aggArguments) { + List<Integer> aggregatorInputChannels) { super( sourceId, @@ -165,7 +162,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); - this.aggArguments = aggArguments; + this.aggregatorInputChannels = aggregatorInputChannels; this.timeIterator = tableTimeRangeIterator; if (tableTimeRangeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { @@ -173,7 +170,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } this.maxReturnSize = maxReturnSize; - this.maxTsBlockLineNum = maxTsBlockLineNum; constructAlignedSeriesScanUtil(); } @@ -409,8 +405,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); - Column[] valueColumns = new Column[aggArguments.size()]; - for (int idx : aggArguments) { + Column[] valueColumns = new Column[aggregatorInputChannels.size()]; + for (int idx : aggregatorInputChannels) { if (valueColumns[idx] != null) { continue; } @@ -496,10 +492,13 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation idx++; TsTableColumnCategory columnSchemaCategory = - columnSchemas.get(aggArguments.get(idx)).getColumnCategory(); + columnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); statisticsArray[i] = buildStatistics( - columnSchemaCategory, timeStatistics, valueStatistics, aggArguments.get(idx)); + columnSchemaCategory, + timeStatistics, + valueStatistics, + aggregatorInputChannels.get(idx)); } aggregator.processStatistics(statisticsArray); @@ -855,7 +854,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation this.queryDataSource = (QueryDataSource) dataSource; this.seriesScanUtil.initQueryDataSource(queryDataSource); this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); - this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java index 20efe5e7b19..481e575340c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java @@ -23,38 +23,86 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSou import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR; public class TableLastQueryOperator extends AbstractDataSourceOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableLastQueryOperator.class); + private static final TableDeviceSchemaCache TABLE_DEVICE_SCHEMA_CACHE = + TableDeviceSchemaCache.getInstance(); + private boolean finished = false; // TODO not need all table aggregators when match last cache private final List<TableAggregator> tableAggregators; + private List<TableAggregator> unCachedTableAggregators; private final List<ColumnSchema> groupingKeySchemas; + private final QualifiedObjectName qualifiedObjectName; private final List<DeviceEntry> deviceEntries; private int currentDeviceIndex; + private final List<String> measurementColumnNames; + private final List<IMeasurementSchema> measurementSchemas; + private final List<TSDataType> measurementColumnTSDataTypes; + + private QueryDataSource queryDataSource; + + // last_by(x,time) or last(time) + private final boolean[] isLastBy; + private final List<String> lastColumns; + private final List<String> lastByColumns; + private final int[] indexOfLastColumnInAggregators; + private final int[] indexOfLastByColumnInAggregators; + private boolean hashLastBy; + private boolean needCacheTimeColumn; + private boolean calcCacheForCurrentDevice; + private List<String> currentUnCacheMeasurements = new ArrayList<>(); public TableLastQueryOperator( List<TableAggregator> tableAggregators, List<ColumnSchema> groupingKeySchemas, - List<DeviceEntry> deviceEntries) { + QualifiedObjectName qualifiedObjectName, + List<DeviceEntry> deviceEntries, + List<String> measurementColumnNames, + List<IMeasurementSchema> measurementSchemas, + List<TSDataType> measurementColumnTSDataTypes) { this.tableAggregators = tableAggregators; this.groupingKeySchemas = groupingKeySchemas; + this.qualifiedObjectName = qualifiedObjectName; this.deviceEntries = deviceEntries; + this.measurementColumnNames = measurementColumnNames; + this.measurementSchemas = measurementSchemas; + this.measurementColumnTSDataTypes = measurementColumnTSDataTypes; + this.isLastBy = new boolean[tableAggregators.size()]; + + this.lastColumns = new ArrayList<>(); + this.lastByColumns = new ArrayList<>(); + this.indexOfLastColumnInAggregators = new int[lastColumns.size()]; + this.indexOfLastByColumnInAggregators = new int[lastByColumns.size()]; } @Override @@ -83,10 +131,10 @@ public class TableLastQueryOperator extends AbstractDataSourceOperator { return getResultFromRetainedTsBlock(); } - while (!resultTsBlockBuilder.isFull()) { - if (processFinished() || System.nanoTime() - start > maxRuntime) { - break; - } + while (System.nanoTime() - start > maxRuntime + && !resultTsBlockBuilder.isFull() + && currentDeviceIndex < deviceEntries.size()) { + processCurrentDevice(); } if (resultTsBlockBuilder.isEmpty()) { @@ -97,9 +145,64 @@ public class TableLastQueryOperator extends AbstractDataSourceOperator { return checkTsBlockSizeAndGetResult(); } - private boolean processFinished() { + /** Main process logic, calc the last aggregation results of current device. */ + private void processCurrentDevice() { + // calc indexes... + // consider last(time), last(id), last(attr), last(measurement), last_by(xx) + + // TODO need lock? + DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex); + + if (!calcCacheForCurrentDevice) { + if (!lastByColumns.isEmpty()) { + Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult = + TABLE_DEVICE_SCHEMA_CACHE.getLastRow( + qualifiedObjectName.getDatabaseName(), + currentDeviceEntry.getDeviceID(), + "", + lastByColumns); + if (!lastByResult.isPresent()) { + // all missed + + } else { + + } + // TODO verify id and attr columns + } + + if (!lastColumns.isEmpty()) { + for (int i = 0; i < lastColumns.size(); i++) { + String measurement = lastByColumns.get(i); + TimeValuePair timeValuePair = + TABLE_DEVICE_SCHEMA_CACHE.getLastEntry( + qualifiedObjectName.getDatabaseName(), + currentDeviceEntry.getDeviceID(), + measurement); + if (timeValuePair == null) { + currentUnCacheMeasurements.add(measurement); + } else { + ColumnBuilder columnBuilder = + resultTsBlockBuilder.getColumnBuilder(indexOfLastColumnInAggregators[i]); + if (timeValuePair == EMPTY_TIME_VALUE_PAIR) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeTsPrimitiveType(timeValuePair.getValue()); + } + } + } + } + + calcCacheForCurrentDevice = true; + } + + // read last value from File or MemTable, update last cache + if (!currentUnCacheMeasurements.isEmpty()) { + // TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(); - return true; + // TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists + + // init SeriesOptions + } } private void buildResultTsBlock() { @@ -128,6 +231,13 @@ public class TableLastQueryOperator extends AbstractDataSourceOperator { return resultDataTypes; } + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + this.queryDataSource = (QueryDataSource) dataSource; + this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + } + @Override public long calculateMaxPeekMemory() { // TODO diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index cbb7e6c7577..40866a9ce30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -378,21 +378,18 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } } - SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate().isPresent() - ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) - : new SeriesScanOptions.Builder(); - scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); - scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); - scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); - scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); - - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter( - pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); - } + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + columnSchemaMap, + measurementColumnNames, + measurementColumnsIndexMap, + timeColumnName, + node.getTimePredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.getPushDownPredicate()); OperatorContext operatorContext = context @@ -421,7 +418,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution columnsIndexArray, node.getDeviceEntries(), node.getScanOrder(), - scanOptionsBuilder.build(), + seriesScanOptions, measurementColumnNames, allSensors, measurementSchemas, @@ -1753,7 +1750,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution int distinctArgumentCount = node.getAssignments().size(); int aggregationsCount = node.getAggregations().size(); - List<Integer> aggColumnIndexes = new ArrayList<>(); + List<Integer> aggregatorInputChannels = new ArrayList<>(); int channel = 0; int measurementColumnCount = 0; Map<Symbol, Integer> idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap(); @@ -1804,10 +1801,10 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } if (!columnLayout.containsKey(symbol)) { - aggColumnIndexes.add(channel); + aggregatorInputChannels.add(channel); columnLayout.put(symbol, channel++); } else { - aggColumnIndexes.add(columnLayout.get(symbol)); + aggregatorInputChannels.add(columnLayout.get(symbol)); } } } @@ -1894,25 +1891,21 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution context.getNextOperatorId(), node.getPlanNodeId(), TableAggregationTableScanOperator.class.getSimpleName()); - SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate().isPresent() - ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) - : new SeriesScanOptions.Builder(); - scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); - scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); - scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); - scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter( - pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); - } + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + columnSchemaMap, + measurementColumnNames, + measurementColumnsIndexMap, + timeColumnName, + node.getTimePredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.getPushDownPredicate()); Set<String> allSensors = new HashSet<>(measurementColumnNames); - // for time column - allSensors.add(""); - + allSensors.add(""); // for time column TableAggregationTableScanOperator aggTableScanOperator = new TableAggregationTableScanOperator( node.getPlanNodeId(), @@ -1921,11 +1914,10 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution columnsIndexArray, node.getDeviceEntries(), scanAscending ? Ordering.ASC : Ordering.DESC, - scanOptionsBuilder.build(), + seriesScanOptions, measurementColumnNames, allSensors, measurementSchemas, - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), measurementColumnCount, aggregators, groupingKeySchemas, @@ -1934,7 +1926,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution scanAscending, calculateMaxAggregationResultSize(), canUseStatistic, - aggColumnIndexes); + aggregatorInputChannels); ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); @@ -1957,6 +1949,33 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return aggTableScanOperator; } + private SeriesScanOptions buildSeriesScanOptions( + LocalExecutionPlanContext context, + Map<Symbol, ColumnSchema> columnSchemaMap, + List<String> measurementColumnNames, + Map<String, Integer> measurementColumnsIndexMap, + String timeColumnName, + Optional<Expression> timePredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + Expression pushDownPredicate) { + SeriesScanOptions.Builder scanOptionsBuilder = + timePredicate + .map(expression -> getSeriesScanOptionsBuilder(context, expression)) + .orElseGet(SeriesScanOptions.Builder::new); + scanOptionsBuilder.withPushDownLimit(pushDownLimit); + scanOptionsBuilder.withPushDownOffset(pushDownOffset); + scanOptionsBuilder.withPushLimitToEachDevice(pushLimitToEachDevice); + scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); + if (pushDownPredicate != null) { + scanOptionsBuilder.withPushDownFilter( + convertPredicateToFilter( + pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); + } + return scanOptionsBuilder.build(); + } + @Override public Operator visitExplainAnalyze(ExplainAnalyzeNode node, LocalExecutionPlanContext context) { Operator operator = node.getChild().accept(this, context);
