This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 911a5879e0c39b83f4a25883227813d2125ed282
Merge: 8cec7b97bc7 fde91579d17
Author: JackieTien97 <[email protected]>
AuthorDate: Wed May 29 19:52:07 2024 +0800

    merge master

 .asf.yaml                                          |   2 +-
 .mvn/wrapper/maven-wrapper.properties              |   4 +-
 .../iotdb/it/env/cluster/config/MppJVMConfig.java  |  18 +
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   1 +
 .../it/env/cluster/node/ConfigNodeWrapper.java     |   1 +
 .../iotdb/it/framework/IoTDBTestReporter.java      |   9 +-
 .../org/apache/iotdb/db/it/IoTDBFlushQueryIT.java  |   7 +-
 ...oTDBAlignByDeviceWithTemplateAggregationIT.java | 542 +++++++++++++++++++++
 .../it/query/IoTDBSelectCompareExpressionIT.java   |  10 +-
 .../apache/iotdb/it/framework/IoTDBTestRunner.java |   3 +
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    |   2 +-
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     |   3 +-
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    |   4 +-
 .../assembly/resources/sbin/remove-confignode.bat  |   4 +-
 .../heartbeat/DataNodeHeartbeatHandler.java        |   6 +-
 .../response/pipe/task/PipeTableResp.java          |  95 ++--
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   |  48 +-
 .../runtime/PipeRuntimeCoordinator.java            |  10 +-
 .../runtime/heartbeat/PipeHeartbeat.java           |  32 +-
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  15 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  12 +-
 .../extractor/ConfigRegionListeningFilter.java     |   2 +-
 .../manager/pipe/metric/PipeConfigNodeMetrics.java |   2 +
 .../metric/PipeConfigNodeRemainingTimeMetrics.java |   9 +
 .../PipeConfigNodeRemainingTimeOperator.java       |   8 +-
 .../metric/PipeConfigRegionExtractorMetrics.java   |  14 +
 .../pipe/metric/PipeTemporaryMetaMetrics.java      | 175 +++++++
 .../confignode/persistence/pipe/PipeInfo.java      |  75 ++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   5 +-
 .../consensus/response/pipe/PipeTableRespTest.java |   4 +-
 .../assembly/resources/sbin/remove-datanode.bat    |   4 +-
 .../src/assembly/resources/sbin/remove-datanode.sh |   2 +-
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   6 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  24 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  77 ++-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  79 +--
 .../client/IoTDBDataNodeAsyncClientManager.java    |   2 +-
 .../IoTDBDataNodeCacheLeaderClientManager.java     |   4 +
 .../client/IoTDBDataNodeSyncClientManager.java     |   2 +-
 .../PipeTransferTabletInsertNodeEventHandler.java  |   6 +-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |   5 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |   9 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  16 +-
 .../schemaregion/SchemaRegionListeningFilter.java  |   2 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  |  11 +
 .../common/header/ColumnHeaderConstant.java        |   6 +-
 .../filter/AbstractMonthIntervalFillFilter.java    |   5 -
 .../fill/filter/MonthIntervalMSFillFilter.java     |   8 +-
 .../fill/filter/MonthIntervalNSFillFilter.java     |  12 +-
 .../fill/filter/MonthIntervalUSFillFilter.java     |  10 +-
 .../process/relational/StreamSortOperator.java     |   3 +-
 .../db/queryengine/plan/analyze/Analysis.java      |  16 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  10 +-
 .../plan/analyze/ExpressionTypeAnalyzer.java       |  10 +-
 .../plan/analyze/TemplatedAggregationAnalyze.java  | 260 ++++++++++
 .../queryengine/plan/analyze/TemplatedAnalyze.java | 156 +++---
 .../db/queryengine/plan/analyze/TemplatedInfo.java | 215 ++++++--
 .../execution/config/sys/pipe/ShowPipeTask.java    |  45 +-
 .../plan/optimization/AggregationPushDown.java     | 177 ++++++-
 .../plan/optimization/PredicatePushDown.java       |  16 +-
 .../plan/planner/LogicalPlanBuilder.java           |  14 +-
 .../plan/planner/LogicalPlanVisitor.java           |   2 +-
 .../plan/planner/OperatorTreeGenerator.java        | 188 +++++--
 .../plan/planner/SubPlanTypeExtractor.java         |   9 +
 .../plan/planner/TemplatedLogicalPlan.java         | 257 +++++++++-
 .../plan/planner/TemplatedLogicalPlanBuilder.java  |  69 ++-
 .../planner/distribution/DistributionPlanner.java  |   1 +
 .../plan/planner/plan/PlanFragment.java            |   6 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   2 +-
 .../plan/planner/plan/node/PlanNodeType.java       |   8 +-
 .../planner/plan/node/process/DeviceViewNode.java  |  41 ++
 .../plan/planner/plan/node/process/FilterNode.java |  59 ++-
 .../plan/node/process/RawDataAggregationNode.java  |  20 +
 .../plan/node/process/SingleDeviceViewNode.java    |   2 +-
 .../source/AlignedSeriesAggregationScanNode.java   |  70 ++-
 .../plan/parameter/AggregationDescriptor.java      |   2 +-
 .../iotdb/db/storageengine/StorageEngine.java      |   3 +-
 .../execute/task/AbstractCompactionTask.java       |  11 +
 .../execute/task/CrossSpaceCompactionTask.java     |  19 +
 .../execute/task/InnerSpaceCompactionTask.java     |  21 +
 .../compaction/schedule/CompactionScheduler.java   |  34 +-
 .../compaction/schedule/CompactionTaskManager.java |  13 +-
 .../compaction/schedule/CompactionTaskQueue.java   |   2 +
 .../aggregation/TimeRangeIteratorTest.java         | 229 ++++-----
 .../plan/optimization/TestPlanBuilder.java         |   3 +-
 .../logical/DataQueryLogicalPlannerTest.java       |  18 +-
 .../planner/node/process/FilterNodeSerdeTest.java  |   3 +-
 .../compaction/CompactionOverlapCheckTest.java     |   6 +
 .../FastInnerCompactionPerformerTest.java          |   7 +
 .../resources/conf/iotdb-common.properties         |   2 +-
 .../config/constant/PipeConnectorConstant.java     |   2 +-
 .../config/constant/PipeExtractorConstant.java     |  12 +-
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |   9 +-
 .../commons/pipe/task/meta/PipeTemporaryMeta.java  |  33 +-
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 .../src/main/thrift/confignode.thrift              |   2 +
 .../src/main/thrift/datanode.thrift                |   4 +
 pom.xml                                            |   4 +-
 98 files changed, 2912 insertions(+), 597 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 1577ee98eda,e792f580981..285faba2011
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@@ -110,6 -111,11 +111,11 @@@ public class PipeRawTabletInsertionEven
    @Override
    public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
      allocatedMemoryBlock.close();
+ 
+     // Record the deviceId before the memory is released,
+     // for later possibly updating the leader cache.
 -    deviceId = tablet.deviceId;
++    deviceId = tablet.getDeviceId();
+ 
      // Actually release the occupied memory.
      tablet = null;
      dataContainer = null;
@@@ -183,7 -189,8 +189,8 @@@
    }
  
    public String getDeviceId() {
-     return tablet.getDeviceId();
+     // NonNull indicates that the internallyDecreaseResourceReferenceCount 
has not been called.
 -    return Objects.nonNull(tablet) ? tablet.deviceId : deviceId;
++    return Objects.nonNull(tablet) ? tablet.getDeviceId() : deviceId;
    }
  
    /////////////////////////// TabletInsertionEvent ///////////////////////////
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
index d85ca438479,00000000000..1e1afe99e22
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
@@@ -1,381 -1,0 +1,382 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.operator.process.relational;
 +
- import com.google.common.util.concurrent.ListenableFuture;
 +import org.apache.iotdb.commons.exception.IoTDBException;
 +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 +import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 +import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
 +import 
org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
 +import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
 +import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 +import org.apache.iotdb.db.utils.datastructure.SortKey;
 +import org.apache.iotdb.db.utils.sort.DiskSpiller;
 +import org.apache.iotdb.db.utils.sort.MemoryReader;
 +import org.apache.iotdb.db.utils.sort.SortBufferManager;
 +import org.apache.iotdb.db.utils.sort.SortReader;
++
++import com.google.common.util.concurrent.ListenableFuture;
 +import org.apache.tsfile.block.column.ColumnBuilder;
 +import org.apache.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.tsfile.enums.TSDataType;
 +import org.apache.tsfile.read.common.block.TsBlock;
 +import org.apache.tsfile.read.common.block.TsBlockBuilder;
 +import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 +import org.apache.tsfile.utils.RamUsageEstimator;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.ArrayList;
 +import java.util.Comparator;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
 +
 +public class StreamSortOperator implements ProcessOperator {
 +
 +  private static final long INSTANCE_SIZE =
 +      RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
 +  private final OperatorContext operatorContext;
 +  private final Operator inputOperator;
 +  private final TsBlockBuilder tsBlockBuilder;
 +
 +  // Use to output the result in memory.
 +  // Because the memory may be larger than tsBlockBuilder's max size
 +  // so the data may be return in multiple times.
 +  private int curRow = -1;
 +
 +  private List<SortKey> cachedData;
 +  private final Comparator<SortKey> comparator;
 +  private long cachedBytes;
 +  private final DiskSpiller diskSpiller;
 +  private final SortBufferManager sortBufferManager;
 +
 +  // For mergeSort
 +
 +  private MergeSortHeap mergeSortHeap;
 +  private List<SortReader> sortReaders;
 +  private boolean[] noMoreData;
 +
 +  private static final Logger logger = 
LoggerFactory.getLogger(SortOperator.class);
 +
 +  private final int maxReturnSize =
 +      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 +
 +  private long prepareUntilReadyCost = 0;
 +  private long dataSize = 0;
 +  private long sortCost = 0;
 +
 +  public StreamSortOperator(
 +      OperatorContext operatorContext,
 +      Operator inputOperator,
 +      List<TSDataType> dataTypes,
 +      String folderPath,
 +      Comparator<SortKey> comparator) {
 +    this.operatorContext = operatorContext;
 +    this.inputOperator = inputOperator;
 +    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
 +    this.cachedData = new ArrayList<>();
 +    this.comparator = comparator;
 +    this.cachedBytes = 0;
 +    this.diskSpiller =
 +        new DiskSpiller(folderPath, folderPath + 
operatorContext.getOperatorId(), dataTypes);
 +    this.sortBufferManager = new SortBufferManager();
 +  }
 +
 +  @Override
 +  public OperatorContext getOperatorContext() {
 +    return operatorContext;
 +  }
 +
 +  @Override
 +  public ListenableFuture<?> isBlocked() {
 +    return inputOperator.isBlocked();
 +  }
 +
 +  @Override
 +  public TsBlock next() throws Exception {
 +    if (!inputOperator.hasNextWithTimer()) {
 +      if (diskSpiller.hasSpilledData()) {
 +        try {
 +          prepareSortReaders();
 +          return mergeSort();
 +        } catch (Exception e) {
 +          clear();
 +          throw e;
 +        }
 +      } else {
 +        if (curRow == -1) {
 +          long startTime = System.nanoTime();
 +          cachedData.sort(comparator);
 +          sortCost += System.nanoTime() - startTime;
 +          curRow = 0;
 +        }
 +        return buildTsBlockInMemory();
 +      }
 +    }
 +    long startTime = System.nanoTime();
 +    try {
 +      TsBlock tsBlock = inputOperator.nextWithTimer();
 +      if (tsBlock == null) {
 +        return null;
 +      }
 +      dataSize += tsBlock.getRetainedSizeInBytes();
 +      cacheTsBlock(tsBlock);
 +    } catch (IoTDBException e) {
 +      clear();
 +      throw e;
 +    } finally {
 +      prepareUntilReadyCost += System.nanoTime() - startTime;
 +    }
 +
 +    return null;
 +  }
 +
 +  private void recordMetrics() {
 +    operatorContext.recordSpecifiedInfo("prepareCost/ns", 
Long.toString(prepareUntilReadyCost));
 +    operatorContext.recordSpecifiedInfo("sortedDataSize", 
Long.toString(dataSize));
 +    operatorContext.recordSpecifiedInfo("sortCost/ns", 
Long.toString(sortCost));
 +    int spilledFileSize = diskSpiller.getFileSize();
 +    if (spilledFileSize > 0) {
 +      operatorContext.recordSpecifiedInfo(
 +          "merge sort branch", Integer.toString(diskSpiller.getFileSize() + 
1));
 +    }
 +  }
 +
 +  private void prepareSortReaders() throws IoTDBException {
 +    if (sortReaders != null) {
 +      return;
 +    }
 +    sortReaders = new ArrayList<>();
 +    if (cachedBytes != 0) {
 +      cachedData.sort(comparator);
 +      if (sortBufferManager.allocate(cachedBytes)) {
 +        sortReaders.add(
 +            new MemoryReader(
 +                
cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList())));
 +      } else {
 +        sortBufferManager.allocateOneSortBranch();
 +        diskSpiller.spillSortedData(cachedData);
 +        cachedData = null;
 +      }
 +    }
 +    sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
 +    // if reader is finished
 +    noMoreData = new boolean[sortReaders.size()];
 +  }
 +
 +  private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
 +    long bytesSize = tsBlock.getRetainedSizeInBytes();
 +    if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
 +      cachedBytes += bytesSize;
 +      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
 +        cachedData.add(new MergeSortKey(tsBlock, i));
 +      }
 +    } else {
 +      cachedData.sort(comparator);
 +      spill();
 +      cachedData.clear();
 +      cachedBytes = bytesSize;
 +      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
 +        cachedData.add(new MergeSortKey(tsBlock, i));
 +      }
 +    }
 +  }
 +
 +  private void spill() throws IoTDBException {
 +    // if current memory cannot put this tsBlock, an exception will be thrown 
in spillSortedData()
 +    // because there should be at least 
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
 +    // one branch.
 +    sortBufferManager.allocateOneSortBranch();
 +    diskSpiller.spillSortedData(cachedData);
 +  }
 +
 +  private TsBlock buildTsBlockInMemory() {
 +    tsBlockBuilder.reset();
 +    TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
 +    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
 +    for (int i = curRow; i < cachedData.size(); i++) {
 +      SortKey sortKey = cachedData.get(i);
 +      TsBlock tsBlock = sortKey.tsBlock;
 +      timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex));
 +      for (int j = 0; j < valueColumnBuilders.length; j++) {
 +        if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) {
 +          valueColumnBuilders[j].appendNull();
 +          continue;
 +        }
 +        valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex);
 +      }
 +      tsBlockBuilder.declarePosition();
 +      curRow++;
 +      if (tsBlockBuilder.isFull()) {
 +        break;
 +      }
 +    }
 +
 +    return tsBlockBuilder.build();
 +  }
 +
 +  private TsBlock mergeSort() throws IoTDBException {
 +
 +    // 1. fill the input from each reader
 +    initMergeSortHeap();
 +
 +    long startTime = System.nanoTime();
 +    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 +
 +    // 2. do merge sort until one TsBlock is consumed up
 +    tsBlockBuilder.reset();
 +    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
 +    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
 +    while (!mergeSortHeap.isEmpty()) {
 +
 +      MergeSortKey mergeSortKey = mergeSortHeap.poll();
 +      TsBlock targetBlock = mergeSortKey.tsBlock;
 +      
timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
 +      for (int i = 0; i < valueColumnBuilders.length; i++) {
 +        if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
 +          valueColumnBuilders[i].appendNull();
 +          continue;
 +        }
 +        valueColumnBuilders[i].write(targetBlock.getColumn(i), 
mergeSortKey.rowIndex);
 +      }
 +      tsBlockBuilder.declarePosition();
 +
 +      int readerIndex = mergeSortKey.inputChannelIndex;
 +      mergeSortKey = readNextMergeSortKey(readerIndex);
 +      if (mergeSortKey != null) {
 +        mergeSortHeap.push(mergeSortKey);
 +      } else {
 +        noMoreData[readerIndex] = true;
 +        sortBufferManager.releaseOneSortBranch();
 +      }
 +
 +      // break if time is out or tsBlockBuilder is full or sortBuffer is not 
enough
 +      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
 +        break;
 +      }
 +    }
 +    sortCost += System.nanoTime() - startTime;
 +    return tsBlockBuilder.build();
 +  }
 +
 +  private void initMergeSortHeap() throws IoTDBException {
 +    if (mergeSortHeap == null) {
 +      mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
 +      for (int i = 0; i < sortReaders.size(); i++) {
 +        SortReader sortReader = sortReaders.get(i);
 +        if (sortReader.hasNext()) {
 +          MergeSortKey mergeSortKey = sortReader.next();
 +          mergeSortKey.inputChannelIndex = i;
 +          mergeSortHeap.push(mergeSortKey);
 +        } else {
 +          noMoreData[i] = true;
 +          sortBufferManager.releaseOneSortBranch();
 +        }
 +      }
 +    }
 +  }
 +
 +  private MergeSortKey readNextMergeSortKey(int readerIndex) throws 
IoTDBException {
 +    SortReader sortReader = sortReaders.get(readerIndex);
 +    if (sortReader.hasNext()) {
 +      MergeSortKey mergeSortKey = sortReader.next();
 +      mergeSortKey.inputChannelIndex = readerIndex;
 +      return mergeSortKey;
 +    }
 +    return null;
 +  }
 +
 +  private boolean hasMoreData() {
 +    if (noMoreData == null) {
 +      return true;
 +    }
 +    for (boolean noMore : noMoreData) {
 +      if (!noMore) {
 +        return true;
 +      }
 +    }
 +    return false;
 +  }
 +
 +  public void clear() {
 +    if (!diskSpiller.hasSpilledData()) {
 +      return;
 +    }
 +    try {
 +      if (sortReaders != null) {
 +        for (SortReader sortReader : sortReaders) {
 +          sortReader.close();
 +        }
 +      }
 +    } catch (Exception e) {
 +      logger.error("Fail to close fileChannel", e);
 +    }
 +  }
 +
 +  @Override
 +  public boolean hasNext() throws Exception {
 +    return inputOperator.hasNextWithTimer()
 +        || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
 +        || (diskSpiller.hasSpilledData() && hasMoreData());
 +  }
 +
 +  @Override
 +  public void close() throws Exception {
 +    recordMetrics();
 +    cachedData = null;
 +    clear();
 +    inputOperator.close();
 +  }
 +
 +  @Override
 +  public boolean isFinished() throws Exception {
 +    return !this.hasNextWithTimer();
 +  }
 +
 +  @Override
 +  public long calculateMaxPeekMemory() {
 +    return inputOperator.calculateMaxPeekMemoryWithCounter()
 +        + inputOperator.calculateRetainedSizeAfterCallingNext()
 +        + SORT_BUFFER_SIZE;
 +  }
 +
 +  @Override
 +  public long calculateMaxReturnSize() {
 +    return maxReturnSize;
 +  }
 +
 +  @Override
 +  public long calculateRetainedSizeAfterCallingNext() {
 +    return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
 +  }
 +
 +  @Override
 +  public long ramBytesUsed() {
 +    return INSTANCE_SIZE
 +        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
 +        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
 +        + RamUsageEstimator.sizeOf(noMoreData)
 +        + tsBlockBuilder.getRetainedSizeInBytes();
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index da056f9ec36,ae1e7665800..68d2f5d00f6
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@@ -530,9 -532,8 +537,9 @@@ public class OperatorTreeGenerator exte
    @Override
    public Operator visitSeriesAggregationScan(
        SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
 -    PartialPath seriesPath = node.getSeriesPath();
 +    NonAlignedFullPath seriesPath =
 +        (NonAlignedFullPath) 
IFullPath.convertToIFullPath(node.getSeriesPath());
-     boolean ascending = node.getScanOrder() == Ordering.ASC;
+     boolean ascending = node.getScanOrder() == ASC;
      List<AggregationDescriptor> aggregationDescriptors = 
node.getAggregationDescriptorList();
      List<Aggregator> aggregators = new ArrayList<>();
      aggregationDescriptors.forEach(
@@@ -606,12 -607,51 +613,53 @@@
    @Override
    public Operator visitAlignedSeriesAggregationScan(
        AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext 
context) {
-     AlignedFullPath seriesPath =
-         (AlignedFullPath) IFullPath.convertToIFullPath(node.getAlignedPath());
-     boolean ascending = node.getScanOrder() == Ordering.ASC;
++
+     if (context.isBuildPlanUseTemplate()) {
+       Ordering scanOrder = context.getTemplatedInfo().getScanOrder();
+       List<AggregationDescriptor> aggregationDescriptors;
+       if (node.getDescriptorType() == 0 || node.getDescriptorType() == 2) {
+         aggregationDescriptors = 
context.getTemplatedInfo().getAscendingDescriptorList();
+       } else {
+         scanOrder = scanOrder.reverse();
+         aggregationDescriptors = 
context.getTemplatedInfo().getDescendingDescriptorList();
+       }
+ 
+       return constructAlignedSeriesAggregationScanOperator(
+           node.getPlanNodeId(),
+           node.getAlignedPath(),
+           aggregationDescriptors,
+           context.getTemplatedInfo().getPushDownPredicate(),
+           scanOrder,
+           context.getTemplatedInfo().getGroupByTimeParameter(),
+           context.getTemplatedInfo().isOutputEndTime(),
+           context);
+     }
+ 
+     return constructAlignedSeriesAggregationScanOperator(
+         node.getPlanNodeId(),
+         node.getAlignedPath(),
+         node.getAggregationDescriptorList(),
+         node.getPushDownPredicate(),
+         node.getScanOrder(),
+         node.getGroupByTimeParameter(),
+         node.isOutputEndTime(),
+         context);
+   }
+ 
+   private Operator constructAlignedSeriesAggregationScanOperator(
+       PlanNodeId planNodeId,
+       AlignedPath alignedPath,
+       List<AggregationDescriptor> aggregationDescriptorList,
+       Expression pushDownPredicate,
+       Ordering scanOrder,
+       GroupByTimeParameter groupByTimeParameter,
+       boolean outputEndTime,
+       LocalExecutionPlanContext context) {
++    AlignedFullPath seriesPath = (AlignedFullPath) 
IFullPath.convertToIFullPath(alignedPath);
+     boolean ascending = scanOrder == ASC;
      List<Aggregator> aggregators = new ArrayList<>();
      boolean canUseStatistics = true;
-     for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
+     for (AggregationDescriptor descriptor : aggregationDescriptorList) {
        checkArgument(
            descriptor.getInputExpressions().size() == 1,
            "descriptor's input expression size is not 1");
@@@ -621,8 -662,9 +670,8 @@@
              ((TimeSeriesOperand) (descriptor.getInputExpressions().get(0)))
                  .getPath()
                  .getMeasurement();
-         int seriesIndex = 
seriesPath.getMeasurementList().indexOf(inputSeries);
-         TSDataType seriesDataType = 
seriesPath.getSchemaList().get(seriesIndex).getType();
+         int seriesIndex = 
alignedPath.getMeasurementList().indexOf(inputSeries);
 -        TSDataType seriesDataType =
 -            
alignedPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex);
++        TSDataType seriesDataType = 
alignedPath.getSchemaList().get(seriesIndex).getType();
          if (!judgeCanUseStatistics(descriptor.getAggregationType(), 
seriesDataType)) {
            canUseStatistics = false;
          }
@@@ -690,10 -730,10 +737,10 @@@
                  AlignedSeriesAggregationScanOperator.class.getSimpleName());
      AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
          new AlignedSeriesAggregationScanOperator(
-             node.getPlanNodeId(),
+             planNodeId,
 -            alignedPath,
 +            seriesPath,
-             node.getScanOrder(),
-             node.isOutputEndTime(),
+             scanOrder,
+             outputEndTime,
              scanOptionsBuilder.build(),
              operatorContext,
              aggregators,
@@@ -1443,7 -1500,11 +1509,11 @@@
          node.getPredicate(),
          generateOnlyChildOperator(node, context),
          node.getOutputExpressions(),
-         getInputColumnTypes(node, context.getTypeProvider()),
+         node.getChildren().stream()
+             .map(PlanNode::getOutputColumnNames)
+             .flatMap(List::stream)
 -            .map(context.getTypeProvider()::getType)
++            .map(context.getTypeProvider()::getTreeModelType)
+             .collect(Collectors.toList()),
          makeLayout(node),
          node.isKeepNull(),
          node.getPlanNodeId(),
@@@ -1746,6 -1806,19 +1816,20 @@@
      List<AggregationDescriptor> aggregationDescriptors = 
node.getAggregationDescriptorList();
      for (AggregationDescriptor descriptor : aggregationDescriptors) {
        List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
+       List<TSDataType> dataTypes = new ArrayList<>();
+       for (Expression expression : descriptor.getInputExpressions()) {
+         if (context.isBuildPlanUseTemplate() && expression instanceof 
TimeSeriesOperand) {
+           dataTypes.add(
+               context
+                   .getTemplatedInfo()
+                   .getSchemaMap()
+                   .get(expression.getExpressionString())
+                   .getType());
+         } else {
 -          
dataTypes.add(context.getTypeProvider().getType(expression.getExpressionString()));
++          dataTypes.add(
++              
context.getTypeProvider().getTreeModelType(expression.getExpressionString()));
+         }
+       }
        aggregators.add(
            SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
                descriptor.getAggregationFuncName(),
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
index 7c8111e1f29,fbb10954e2f..b72c3a92142
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@@ -117,11 -204,15 +204,17 @@@ public class TemplatedLogicalPlan 
            .getExpressionTypes()
            .forEach(
                (key, value) ->
 -                  
context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value));
 +                  context
 +                      .getTypeProvider()
 +                      .setTreeModelType(key.getNode().getOutputSymbol(), 
value));
      }
  
+     List<Integer> deviceToMeasurementIndexes =
+         new ArrayList<>(analysis.getSelectExpressions().size() - 1);
+     for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
+       deviceToMeasurementIndexes.add(i);
+     }
+ 
      context
          .getTypeProvider()
          .setTemplatedInfo(
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 42920df7561,c2ca7011039..4a4d168a321
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@@ -210,8 -209,8 +210,9 @@@ public class DistributionPlanner 
      SubPlan subPlan = splitFragment(optimizedRootWithExchange);
      // Mark the root Fragment of root SubPlan as `root`
      subPlan.getPlanFragment().setRoot(true);
 +
      List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
+ 
      // Only execute this step for READ operation
      if (context.getQueryType() == QueryType.READ) {
        setSinkForRootInstance(subPlan, fragmentInstances);
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index f3696f1cee1,3b32f2932cb..f85d17bad28
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@@ -27,9 -27,9 +27,10 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode;
  import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.VirtualSourceNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelTypeProviderExtractor;
  
  import org.apache.tsfile.utils.ReadWriteIOUtils;
  

Reply via email to