This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 911a5879e0c39b83f4a25883227813d2125ed282 Merge: 8cec7b97bc7 fde91579d17 Author: JackieTien97 <[email protected]> AuthorDate: Wed May 29 19:52:07 2024 +0800 merge master .asf.yaml | 2 +- .mvn/wrapper/maven-wrapper.properties | 4 +- .../iotdb/it/env/cluster/config/MppJVMConfig.java | 18 + .../it/env/cluster/node/AbstractNodeWrapper.java | 1 + .../it/env/cluster/node/ConfigNodeWrapper.java | 1 + .../iotdb/it/framework/IoTDBTestReporter.java | 9 +- .../org/apache/iotdb/db/it/IoTDBFlushQueryIT.java | 7 +- ...oTDBAlignByDeviceWithTemplateAggregationIT.java | 542 +++++++++++++++++++++ .../it/query/IoTDBSelectCompareExpressionIT.java | 10 +- .../apache/iotdb/it/framework/IoTDBTestRunner.java | 3 + .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 2 +- .../pipe/it/autocreate/IoTDBPipeClusterIT.java | 3 +- .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 4 +- .../assembly/resources/sbin/remove-confignode.bat | 4 +- .../heartbeat/DataNodeHeartbeatHandler.java | 6 +- .../response/pipe/task/PipeTableResp.java | 95 ++-- .../pipe/agent/task/PipeConfigNodeTaskAgent.java | 48 +- .../runtime/PipeRuntimeCoordinator.java | 10 +- .../runtime/heartbeat/PipeHeartbeat.java | 32 +- .../runtime/heartbeat/PipeHeartbeatParser.java | 15 +- .../runtime/heartbeat/PipeHeartbeatScheduler.java | 12 +- .../extractor/ConfigRegionListeningFilter.java | 2 +- .../manager/pipe/metric/PipeConfigNodeMetrics.java | 2 + .../metric/PipeConfigNodeRemainingTimeMetrics.java | 9 + .../PipeConfigNodeRemainingTimeOperator.java | 8 +- .../metric/PipeConfigRegionExtractorMetrics.java | 14 + .../pipe/metric/PipeTemporaryMetaMetrics.java | 175 +++++++ .../confignode/persistence/pipe/PipeInfo.java | 75 ++- .../confignode/persistence/pipe/PipeTaskInfo.java | 5 +- .../consensus/response/pipe/PipeTableRespTest.java | 4 +- .../assembly/resources/sbin/remove-datanode.bat | 4 +- .../src/assembly/resources/sbin/remove-datanode.sh | 2 +- .../org/apache/iotdb/db/audit/AuditLogger.java | 6 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 77 ++- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 79 +-- .../client/IoTDBDataNodeAsyncClientManager.java | 2 +- .../IoTDBDataNodeCacheLeaderClientManager.java | 4 + .../client/IoTDBDataNodeSyncClientManager.java | 2 +- .../PipeTransferTabletInsertNodeEventHandler.java | 6 +- .../thrift/sync/IoTDBDataRegionSyncConnector.java | 5 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 9 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 16 +- .../schemaregion/SchemaRegionListeningFilter.java | 2 +- .../PipeDataNodeRemainingEventAndTimeMetrics.java | 11 + .../common/header/ColumnHeaderConstant.java | 6 +- .../filter/AbstractMonthIntervalFillFilter.java | 5 - .../fill/filter/MonthIntervalMSFillFilter.java | 8 +- .../fill/filter/MonthIntervalNSFillFilter.java | 12 +- .../fill/filter/MonthIntervalUSFillFilter.java | 10 +- .../process/relational/StreamSortOperator.java | 3 +- .../db/queryengine/plan/analyze/Analysis.java | 16 +- .../queryengine/plan/analyze/AnalyzeVisitor.java | 10 +- .../plan/analyze/ExpressionTypeAnalyzer.java | 10 +- .../plan/analyze/TemplatedAggregationAnalyze.java | 260 ++++++++++ .../queryengine/plan/analyze/TemplatedAnalyze.java | 156 +++--- .../db/queryengine/plan/analyze/TemplatedInfo.java | 215 ++++++-- .../execution/config/sys/pipe/ShowPipeTask.java | 45 +- .../plan/optimization/AggregationPushDown.java | 177 ++++++- .../plan/optimization/PredicatePushDown.java | 16 +- .../plan/planner/LogicalPlanBuilder.java | 14 +- .../plan/planner/LogicalPlanVisitor.java | 2 +- .../plan/planner/OperatorTreeGenerator.java | 188 +++++-- .../plan/planner/SubPlanTypeExtractor.java | 9 + .../plan/planner/TemplatedLogicalPlan.java | 257 +++++++++- .../plan/planner/TemplatedLogicalPlanBuilder.java | 69 ++- .../planner/distribution/DistributionPlanner.java | 1 + .../plan/planner/plan/PlanFragment.java | 6 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 2 +- .../plan/planner/plan/node/PlanNodeType.java | 8 +- .../planner/plan/node/process/DeviceViewNode.java | 41 ++ .../plan/planner/plan/node/process/FilterNode.java | 59 ++- .../plan/node/process/RawDataAggregationNode.java | 20 + .../plan/node/process/SingleDeviceViewNode.java | 2 +- .../source/AlignedSeriesAggregationScanNode.java | 70 ++- .../plan/parameter/AggregationDescriptor.java | 2 +- .../iotdb/db/storageengine/StorageEngine.java | 3 +- .../execute/task/AbstractCompactionTask.java | 11 + .../execute/task/CrossSpaceCompactionTask.java | 19 + .../execute/task/InnerSpaceCompactionTask.java | 21 + .../compaction/schedule/CompactionScheduler.java | 34 +- .../compaction/schedule/CompactionTaskManager.java | 13 +- .../compaction/schedule/CompactionTaskQueue.java | 2 + .../aggregation/TimeRangeIteratorTest.java | 229 ++++----- .../plan/optimization/TestPlanBuilder.java | 3 +- .../logical/DataQueryLogicalPlannerTest.java | 18 +- .../planner/node/process/FilterNodeSerdeTest.java | 3 +- .../compaction/CompactionOverlapCheckTest.java | 6 + .../FastInnerCompactionPerformerTest.java | 7 + .../resources/conf/iotdb-common.properties | 2 +- .../config/constant/PipeConnectorConstant.java | 2 +- .../config/constant/PipeExtractorConstant.java | 12 +- .../iotdb/commons/pipe/task/meta/PipeMeta.java | 9 +- .../commons/pipe/task/meta/PipeTemporaryMeta.java | 33 +- .../iotdb/commons/service/metric/enums/Metric.java | 2 + .../src/main/thrift/confignode.thrift | 2 + .../src/main/thrift/datanode.thrift | 4 + pom.xml | 4 +- 98 files changed, 2912 insertions(+), 597 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 1577ee98eda,e792f580981..285faba2011 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@@ -110,6 -111,11 +111,11 @@@ public class PipeRawTabletInsertionEven @Override public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { allocatedMemoryBlock.close(); + + // Record the deviceId before the memory is released, + // for later possibly updating the leader cache. - deviceId = tablet.deviceId; ++ deviceId = tablet.getDeviceId(); + // Actually release the occupied memory. tablet = null; dataContainer = null; @@@ -183,7 -189,8 +189,8 @@@ } public String getDeviceId() { - return tablet.getDeviceId(); + // NonNull indicates that the internallyDecreaseResourceReferenceCount has not been called. - return Objects.nonNull(tablet) ? tablet.deviceId : deviceId; ++ return Objects.nonNull(tablet) ? tablet.getDeviceId() : deviceId; } /////////////////////////// TabletInsertionEvent /////////////////////////// diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java index d85ca438479,00000000000..1e1afe99e22 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java @@@ -1,381 -1,0 +1,382 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.relational; + - import com.google.common.util.concurrent.ListenableFuture; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; +import org.apache.iotdb.db.utils.datastructure.MergeSortHeap; +import org.apache.iotdb.db.utils.datastructure.MergeSortKey; +import org.apache.iotdb.db.utils.datastructure.SortKey; +import org.apache.iotdb.db.utils.sort.DiskSpiller; +import org.apache.iotdb.db.utils.sort.MemoryReader; +import org.apache.iotdb.db.utils.sort.SortBufferManager; +import org.apache.iotdb.db.utils.sort.SortReader; ++ ++import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE; + +public class StreamSortOperator implements ProcessOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SortOperator.class); + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final TsBlockBuilder tsBlockBuilder; + + // Use to output the result in memory. + // Because the memory may be larger than tsBlockBuilder's max size + // so the data may be return in multiple times. + private int curRow = -1; + + private List<SortKey> cachedData; + private final Comparator<SortKey> comparator; + private long cachedBytes; + private final DiskSpiller diskSpiller; + private final SortBufferManager sortBufferManager; + + // For mergeSort + + private MergeSortHeap mergeSortHeap; + private List<SortReader> sortReaders; + private boolean[] noMoreData; + + private static final Logger logger = LoggerFactory.getLogger(SortOperator.class); + + private final int maxReturnSize = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + + private long prepareUntilReadyCost = 0; + private long dataSize = 0; + private long sortCost = 0; + + public StreamSortOperator( + OperatorContext operatorContext, + Operator inputOperator, + List<TSDataType> dataTypes, + String folderPath, + Comparator<SortKey> comparator) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.tsBlockBuilder = new TsBlockBuilder(dataTypes); + this.cachedData = new ArrayList<>(); + this.comparator = comparator; + this.cachedBytes = 0; + this.diskSpiller = + new DiskSpiller(folderPath, folderPath + operatorContext.getOperatorId(), dataTypes); + this.sortBufferManager = new SortBufferManager(); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + return inputOperator.isBlocked(); + } + + @Override + public TsBlock next() throws Exception { + if (!inputOperator.hasNextWithTimer()) { + if (diskSpiller.hasSpilledData()) { + try { + prepareSortReaders(); + return mergeSort(); + } catch (Exception e) { + clear(); + throw e; + } + } else { + if (curRow == -1) { + long startTime = System.nanoTime(); + cachedData.sort(comparator); + sortCost += System.nanoTime() - startTime; + curRow = 0; + } + return buildTsBlockInMemory(); + } + } + long startTime = System.nanoTime(); + try { + TsBlock tsBlock = inputOperator.nextWithTimer(); + if (tsBlock == null) { + return null; + } + dataSize += tsBlock.getRetainedSizeInBytes(); + cacheTsBlock(tsBlock); + } catch (IoTDBException e) { + clear(); + throw e; + } finally { + prepareUntilReadyCost += System.nanoTime() - startTime; + } + + return null; + } + + private void recordMetrics() { + operatorContext.recordSpecifiedInfo("prepareCost/ns", Long.toString(prepareUntilReadyCost)); + operatorContext.recordSpecifiedInfo("sortedDataSize", Long.toString(dataSize)); + operatorContext.recordSpecifiedInfo("sortCost/ns", Long.toString(sortCost)); + int spilledFileSize = diskSpiller.getFileSize(); + if (spilledFileSize > 0) { + operatorContext.recordSpecifiedInfo( + "merge sort branch", Integer.toString(diskSpiller.getFileSize() + 1)); + } + } + + private void prepareSortReaders() throws IoTDBException { + if (sortReaders != null) { + return; + } + sortReaders = new ArrayList<>(); + if (cachedBytes != 0) { + cachedData.sort(comparator); + if (sortBufferManager.allocate(cachedBytes)) { + sortReaders.add( + new MemoryReader( + cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList()))); + } else { + sortBufferManager.allocateOneSortBranch(); + diskSpiller.spillSortedData(cachedData); + cachedData = null; + } + } + sortReaders.addAll(diskSpiller.getReaders(sortBufferManager)); + // if reader is finished + noMoreData = new boolean[sortReaders.size()]; + } + + private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException { + long bytesSize = tsBlock.getRetainedSizeInBytes(); + if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) { + cachedBytes += bytesSize; + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + cachedData.add(new MergeSortKey(tsBlock, i)); + } + } else { + cachedData.sort(comparator); + spill(); + cachedData.clear(); + cachedBytes = bytesSize; + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + cachedData.add(new MergeSortKey(tsBlock, i)); + } + } + } + + private void spill() throws IoTDBException { + // if current memory cannot put this tsBlock, an exception will be thrown in spillSortedData() + // because there should be at least tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for + // one branch. + sortBufferManager.allocateOneSortBranch(); + diskSpiller.spillSortedData(cachedData); + } + + private TsBlock buildTsBlockInMemory() { + tsBlockBuilder.reset(); + TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); + ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); + for (int i = curRow; i < cachedData.size(); i++) { + SortKey sortKey = cachedData.get(i); + TsBlock tsBlock = sortKey.tsBlock; + timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex)); + for (int j = 0; j < valueColumnBuilders.length; j++) { + if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) { + valueColumnBuilders[j].appendNull(); + continue; + } + valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex); + } + tsBlockBuilder.declarePosition(); + curRow++; + if (tsBlockBuilder.isFull()) { + break; + } + } + + return tsBlockBuilder.build(); + } + + private TsBlock mergeSort() throws IoTDBException { + + // 1. fill the input from each reader + initMergeSortHeap(); + + long startTime = System.nanoTime(); + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + + // 2. do merge sort until one TsBlock is consumed up + tsBlockBuilder.reset(); + TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); + ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); + while (!mergeSortHeap.isEmpty()) { + + MergeSortKey mergeSortKey = mergeSortHeap.poll(); + TsBlock targetBlock = mergeSortKey.tsBlock; + timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex)); + for (int i = 0; i < valueColumnBuilders.length; i++) { + if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) { + valueColumnBuilders[i].appendNull(); + continue; + } + valueColumnBuilders[i].write(targetBlock.getColumn(i), mergeSortKey.rowIndex); + } + tsBlockBuilder.declarePosition(); + + int readerIndex = mergeSortKey.inputChannelIndex; + mergeSortKey = readNextMergeSortKey(readerIndex); + if (mergeSortKey != null) { + mergeSortHeap.push(mergeSortKey); + } else { + noMoreData[readerIndex] = true; + sortBufferManager.releaseOneSortBranch(); + } + + // break if time is out or tsBlockBuilder is full or sortBuffer is not enough + if (System.nanoTime() - startTime > maxRuntime || tsBlockBuilder.isFull()) { + break; + } + } + sortCost += System.nanoTime() - startTime; + return tsBlockBuilder.build(); + } + + private void initMergeSortHeap() throws IoTDBException { + if (mergeSortHeap == null) { + mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator); + for (int i = 0; i < sortReaders.size(); i++) { + SortReader sortReader = sortReaders.get(i); + if (sortReader.hasNext()) { + MergeSortKey mergeSortKey = sortReader.next(); + mergeSortKey.inputChannelIndex = i; + mergeSortHeap.push(mergeSortKey); + } else { + noMoreData[i] = true; + sortBufferManager.releaseOneSortBranch(); + } + } + } + } + + private MergeSortKey readNextMergeSortKey(int readerIndex) throws IoTDBException { + SortReader sortReader = sortReaders.get(readerIndex); + if (sortReader.hasNext()) { + MergeSortKey mergeSortKey = sortReader.next(); + mergeSortKey.inputChannelIndex = readerIndex; + return mergeSortKey; + } + return null; + } + + private boolean hasMoreData() { + if (noMoreData == null) { + return true; + } + for (boolean noMore : noMoreData) { + if (!noMore) { + return true; + } + } + return false; + } + + public void clear() { + if (!diskSpiller.hasSpilledData()) { + return; + } + try { + if (sortReaders != null) { + for (SortReader sortReader : sortReaders) { + sortReader.close(); + } + } + } catch (Exception e) { + logger.error("Fail to close fileChannel", e); + } + } + + @Override + public boolean hasNext() throws Exception { + return inputOperator.hasNextWithTimer() + || (!diskSpiller.hasSpilledData() && curRow != cachedData.size()) + || (diskSpiller.hasSpilledData() && hasMoreData()); + } + + @Override + public void close() throws Exception { + recordMetrics(); + cachedData = null; + clear(); + inputOperator.close(); + } + + @Override + public boolean isFinished() throws Exception { + return !this.hasNextWithTimer(); + } + + @Override + public long calculateMaxPeekMemory() { + return inputOperator.calculateMaxPeekMemoryWithCounter() + + inputOperator.calculateRetainedSizeAfterCallingNext() + + SORT_BUFFER_SIZE; + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return inputOperator.calculateRetainedSizeAfterCallingNext() + SORT_BUFFER_SIZE; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + RamUsageEstimator.sizeOf(noMoreData) + + tsBlockBuilder.getRetainedSizeInBytes(); + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index da056f9ec36,ae1e7665800..68d2f5d00f6 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@@ -530,9 -532,8 +537,9 @@@ public class OperatorTreeGenerator exte @Override public Operator visitSeriesAggregationScan( SeriesAggregationScanNode node, LocalExecutionPlanContext context) { - PartialPath seriesPath = node.getSeriesPath(); + NonAlignedFullPath seriesPath = + (NonAlignedFullPath) IFullPath.convertToIFullPath(node.getSeriesPath()); - boolean ascending = node.getScanOrder() == Ordering.ASC; + boolean ascending = node.getScanOrder() == ASC; List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList(); List<Aggregator> aggregators = new ArrayList<>(); aggregationDescriptors.forEach( @@@ -606,12 -607,51 +613,53 @@@ @Override public Operator visitAlignedSeriesAggregationScan( AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) { - AlignedFullPath seriesPath = - (AlignedFullPath) IFullPath.convertToIFullPath(node.getAlignedPath()); - boolean ascending = node.getScanOrder() == Ordering.ASC; ++ + if (context.isBuildPlanUseTemplate()) { + Ordering scanOrder = context.getTemplatedInfo().getScanOrder(); + List<AggregationDescriptor> aggregationDescriptors; + if (node.getDescriptorType() == 0 || node.getDescriptorType() == 2) { + aggregationDescriptors = context.getTemplatedInfo().getAscendingDescriptorList(); + } else { + scanOrder = scanOrder.reverse(); + aggregationDescriptors = context.getTemplatedInfo().getDescendingDescriptorList(); + } + + return constructAlignedSeriesAggregationScanOperator( + node.getPlanNodeId(), + node.getAlignedPath(), + aggregationDescriptors, + context.getTemplatedInfo().getPushDownPredicate(), + scanOrder, + context.getTemplatedInfo().getGroupByTimeParameter(), + context.getTemplatedInfo().isOutputEndTime(), + context); + } + + return constructAlignedSeriesAggregationScanOperator( + node.getPlanNodeId(), + node.getAlignedPath(), + node.getAggregationDescriptorList(), + node.getPushDownPredicate(), + node.getScanOrder(), + node.getGroupByTimeParameter(), + node.isOutputEndTime(), + context); + } + + private Operator constructAlignedSeriesAggregationScanOperator( + PlanNodeId planNodeId, + AlignedPath alignedPath, + List<AggregationDescriptor> aggregationDescriptorList, + Expression pushDownPredicate, + Ordering scanOrder, + GroupByTimeParameter groupByTimeParameter, + boolean outputEndTime, + LocalExecutionPlanContext context) { ++ AlignedFullPath seriesPath = (AlignedFullPath) IFullPath.convertToIFullPath(alignedPath); + boolean ascending = scanOrder == ASC; List<Aggregator> aggregators = new ArrayList<>(); boolean canUseStatistics = true; - for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) { + for (AggregationDescriptor descriptor : aggregationDescriptorList) { checkArgument( descriptor.getInputExpressions().size() == 1, "descriptor's input expression size is not 1"); @@@ -621,8 -662,9 +670,8 @@@ ((TimeSeriesOperand) (descriptor.getInputExpressions().get(0))) .getPath() .getMeasurement(); - int seriesIndex = seriesPath.getMeasurementList().indexOf(inputSeries); - TSDataType seriesDataType = seriesPath.getSchemaList().get(seriesIndex).getType(); + int seriesIndex = alignedPath.getMeasurementList().indexOf(inputSeries); - TSDataType seriesDataType = - alignedPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex); ++ TSDataType seriesDataType = alignedPath.getSchemaList().get(seriesIndex).getType(); if (!judgeCanUseStatistics(descriptor.getAggregationType(), seriesDataType)) { canUseStatistics = false; } @@@ -690,10 -730,10 +737,10 @@@ AlignedSeriesAggregationScanOperator.class.getSimpleName()); AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = new AlignedSeriesAggregationScanOperator( - node.getPlanNodeId(), + planNodeId, - alignedPath, + seriesPath, - node.getScanOrder(), - node.isOutputEndTime(), + scanOrder, + outputEndTime, scanOptionsBuilder.build(), operatorContext, aggregators, @@@ -1443,7 -1500,11 +1509,11 @@@ node.getPredicate(), generateOnlyChildOperator(node, context), node.getOutputExpressions(), - getInputColumnTypes(node, context.getTypeProvider()), + node.getChildren().stream() + .map(PlanNode::getOutputColumnNames) + .flatMap(List::stream) - .map(context.getTypeProvider()::getType) ++ .map(context.getTypeProvider()::getTreeModelType) + .collect(Collectors.toList()), makeLayout(node), node.isKeepNull(), node.getPlanNodeId(), @@@ -1746,6 -1806,19 +1816,20 @@@ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList(); for (AggregationDescriptor descriptor : aggregationDescriptors) { List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); + List<TSDataType> dataTypes = new ArrayList<>(); + for (Expression expression : descriptor.getInputExpressions()) { + if (context.isBuildPlanUseTemplate() && expression instanceof TimeSeriesOperand) { + dataTypes.add( + context + .getTemplatedInfo() + .getSchemaMap() + .get(expression.getExpressionString()) + .getType()); + } else { - dataTypes.add(context.getTypeProvider().getType(expression.getExpressionString())); ++ dataTypes.add( ++ context.getTypeProvider().getTreeModelType(expression.getExpressionString())); + } + } aggregators.add( SlidingWindowAggregatorFactory.createSlidingWindowAggregator( descriptor.getAggregationFuncName(), diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index 7c8111e1f29,fbb10954e2f..b72c3a92142 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@@ -117,11 -204,15 +204,17 @@@ public class TemplatedLogicalPlan .getExpressionTypes() .forEach( (key, value) -> - context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value)); + context + .getTypeProvider() + .setTreeModelType(key.getNode().getOutputSymbol(), value)); } + List<Integer> deviceToMeasurementIndexes = + new ArrayList<>(analysis.getSelectExpressions().size() - 1); + for (int i = 1; i < analysis.getSelectExpressions().size(); i++) { + deviceToMeasurementIndexes.add(i); + } + context .getTypeProvider() .setTemplatedInfo( diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java index 42920df7561,c2ca7011039..4a4d168a321 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java @@@ -210,8 -209,8 +210,9 @@@ public class DistributionPlanner SubPlan subPlan = splitFragment(optimizedRootWithExchange); // Mark the root Fragment of root SubPlan as `root` subPlan.getPlanFragment().setRoot(true); + List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan); + // Only execute this step for READ operation if (context.getQueryType() == QueryType.READ) { setSinkForRootInstance(subPlan, fragmentInstances); diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java index f3696f1cee1,3b32f2932cb..f85d17bad28 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java @@@ -27,9 -27,9 +27,10 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; + import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.VirtualSourceNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelTypeProviderExtractor; import org.apache.tsfile.utils.ReadWriteIOUtils;
