This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TableScanEachLimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6bc4c5eb4849441f0833ae62e84386c0a5ed796e
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jun 6 16:55:10 2024 +0800

    Add support for StreamSortOperator
---
 ...SortOperator.java => AbstractSortOperator.java} | 127 ++++-----
 .../execution/operator/process/SortOperator.java   | 309 +--------------------
 .../operator/process/StreamSortOperator.java       | 203 ++++++++++++++
 .../plan/planner/TableOperatorGenerator.java       |  53 ++++
 .../planner/plan/parameter/SeriesScanOptions.java  |  32 ++-
 .../apache/iotdb/db/utils/sort/DiskSpiller.java    |   4 +
 6 files changed, 347 insertions(+), 381 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
similarity index 79%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index e34112c2903..bb8c7a16c2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.operator.process.relational;
+package org.apache.iotdb.db.queryengine.execution.operator.process;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
-import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
 import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -40,7 +37,6 @@ import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
-import org.apache.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,15 +48,13 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
 
-public class StreamSortOperator implements ProcessOperator {
+public abstract class AbstractSortOperator implements ProcessOperator {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamSortOperator.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSortOperator.class);
 
-  private static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
-  private final OperatorContext operatorContext;
-  private final Operator inputOperator;
-  private final TsBlockBuilder tsBlockBuilder;
+  protected final OperatorContext operatorContext;
+  protected final Operator inputOperator;
+  protected final TsBlockBuilder tsBlockBuilder;
 
   // Use to output the result in memory.
   // Because the memory may be larger than tsBlockBuilder's max size
@@ -71,22 +65,22 @@ public class StreamSortOperator implements ProcessOperator {
   private final Comparator<SortKey> comparator;
   private long cachedBytes;
   private final DiskSpiller diskSpiller;
-  private final SortBufferManager sortBufferManager;
+  private SortBufferManager sortBufferManager;
 
   // For mergeSort
 
   private MergeSortHeap mergeSortHeap;
   private List<SortReader> sortReaders;
-  private boolean[] noMoreData;
+  protected boolean[] noMoreData;
 
   private final int maxReturnSize =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
-  private long prepareUntilReadyCost = 0;
-  private long dataSize = 0;
+  protected long prepareUntilReadyCost = 0;
+  protected long dataSize = 0;
   private long sortCost = 0;
 
-  public StreamSortOperator(
+  AbstractSortOperator(
       OperatorContext operatorContext,
       Operator inputOperator,
       List<TSDataType> dataTypes,
@@ -103,6 +97,26 @@ public class StreamSortOperator implements ProcessOperator {
     this.sortBufferManager = new SortBufferManager();
   }
 
+  protected void buildResult() throws IoTDBException {
+    if (diskSpiller.hasSpilledData()) {
+      try {
+        prepareSortReaders();
+        mergeSort();
+      } catch (Exception e) {
+        clear();
+        throw e;
+      }
+    } else {
+      if (curRow == -1) {
+        long startTime = System.nanoTime();
+        cachedData.sort(comparator);
+        sortCost += System.nanoTime() - startTime;
+        curRow = 0;
+      }
+      buildTsBlockInMemory();
+    }
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
     return operatorContext;
@@ -113,45 +127,6 @@ public class StreamSortOperator implements ProcessOperator 
{
     return inputOperator.isBlocked();
   }
 
-  @Override
-  public TsBlock next() throws Exception {
-    if (!inputOperator.hasNextWithTimer()) {
-      if (diskSpiller.hasSpilledData()) {
-        try {
-          prepareSortReaders();
-          return mergeSort();
-        } catch (Exception e) {
-          clear();
-          throw e;
-        }
-      } else {
-        if (curRow == -1) {
-          long startTime = System.nanoTime();
-          cachedData.sort(comparator);
-          sortCost += System.nanoTime() - startTime;
-          curRow = 0;
-        }
-        return buildTsBlockInMemory();
-      }
-    }
-    long startTime = System.nanoTime();
-    try {
-      TsBlock tsBlock = inputOperator.nextWithTimer();
-      if (tsBlock == null) {
-        return null;
-      }
-      dataSize += tsBlock.getRetainedSizeInBytes();
-      cacheTsBlock(tsBlock);
-    } catch (IoTDBException e) {
-      clear();
-      throw e;
-    } finally {
-      prepareUntilReadyCost += System.nanoTime() - startTime;
-    }
-
-    return null;
-  }
-
   private void recordMetrics() {
     operatorContext.recordSpecifiedInfo("prepareCost/ns", 
Long.toString(prepareUntilReadyCost));
     operatorContext.recordSpecifiedInfo("sortedDataSize", 
Long.toString(dataSize));
@@ -185,7 +160,7 @@ public class StreamSortOperator implements ProcessOperator {
     noMoreData = new boolean[sortReaders.size()];
   }
 
-  private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+  protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
     long bytesSize = tsBlock.getRetainedSizeInBytes();
     if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
       cachedBytes += bytesSize;
@@ -211,8 +186,7 @@ public class StreamSortOperator implements ProcessOperator {
     diskSpiller.spillSortedData(cachedData);
   }
 
-  private TsBlock buildTsBlockInMemory() {
-    tsBlockBuilder.reset();
+  private void buildTsBlockInMemory() {
     TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
     for (int i = curRow; i < cachedData.size(); i++) {
@@ -232,11 +206,9 @@ public class StreamSortOperator implements ProcessOperator 
{
         break;
       }
     }
-
-    return tsBlockBuilder.build();
   }
 
-  private TsBlock mergeSort() throws IoTDBException {
+  private void mergeSort() throws IoTDBException {
 
     // 1. fill the input from each reader
     initMergeSortHeap();
@@ -245,7 +217,6 @@ public class StreamSortOperator implements ProcessOperator {
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 
     // 2. do merge sort until one TsBlock is consumed up
-    tsBlockBuilder.reset();
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
     while (!mergeSortHeap.isEmpty()) {
@@ -277,7 +248,6 @@ public class StreamSortOperator implements ProcessOperator {
       }
     }
     sortCost += System.nanoTime() - startTime;
-    return tsBlockBuilder.build();
   }
 
   private void initMergeSortHeap() throws IoTDBException {
@@ -329,15 +299,20 @@ public class StreamSortOperator implements 
ProcessOperator {
           sortReader.close();
         }
       }
+      sortReaders = null;
+      diskSpiller.reset();
     } catch (Exception e) {
-      LOGGER.error("Fail to close fileChannel", e);
+      LOGGER.warn("Fail to close fileChannel", e);
     }
   }
 
   @Override
   public boolean hasNext() throws Exception {
-    return inputOperator.hasNextWithTimer()
-        || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+    return inputOperator.hasNextWithTimer() || hasMoreSortedData();
+  }
+
+  protected boolean hasMoreSortedData() {
+    return (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
         || (diskSpiller.hasSpilledData() && hasMoreData());
   }
 
@@ -371,12 +346,16 @@ public class StreamSortOperator implements 
ProcessOperator {
     return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
   }
 
-  @Override
-  public long ramBytesUsed() {
-    return INSTANCE_SIZE
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
-        + RamUsageEstimator.sizeOf(noMoreData)
-        + tsBlockBuilder.getRetainedSizeInBytes();
+  protected void resetSortRelatedResource() {
+    curRow = -1;
+    cachedData.clear();
+    cachedBytes = 0;
+    clear();
+    sortBufferManager = new SortBufferManager();
+    if (!mergeSortHeap.isEmpty()) {
+      throw new IllegalStateException("mergeSortHeap should be empty!");
+    }
+    mergeSortHeap = null;
+    noMoreData = null;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
index 55453353f0d..0f9ca2bd848 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
@@ -23,66 +23,19 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
-import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
-import org.apache.iotdb.db.utils.sort.DiskSpiller;
-import org.apache.iotdb.db.utils.sort.MemoryReader;
-import org.apache.iotdb.db.utils.sort.SortBufferManager;
-import org.apache.iotdb.db.utils.sort.SortReader;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.RamUsageEstimator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
-
-public class SortOperator implements ProcessOperator {
+public class SortOperator extends AbstractSortOperator {
 
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
-  private final OperatorContext operatorContext;
-  private final Operator inputOperator;
-  private final TsBlockBuilder tsBlockBuilder;
-
-  // Use to output the result in memory.
-  // Because the memory may be larger than tsBlockBuilder's max size
-  // so the data may be return in multiple times.
-  private int curRow = -1;
-
-  private List<SortKey> cachedData;
-  private final Comparator<SortKey> comparator;
-  private long cachedBytes;
-  private final DiskSpiller diskSpiller;
-  private final SortBufferManager sortBufferManager;
-
-  // For mergeSort
-
-  private MergeSortHeap mergeSortHeap;
-  private List<SortReader> sortReaders;
-  private boolean[] noMoreData;
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SortOperator.class);
-
-  private final int maxReturnSize =
-      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
-  private long prepareUntilReadyCost = 0;
-  private long dataSize = 0;
-  private long sortCost = 0;
 
   public SortOperator(
       OperatorContext operatorContext,
@@ -90,47 +43,16 @@ public class SortOperator implements ProcessOperator {
       List<TSDataType> dataTypes,
       String folderPath,
       Comparator<SortKey> comparator) {
-    this.operatorContext = operatorContext;
-    this.inputOperator = inputOperator;
-    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.cachedData = new ArrayList<>();
-    this.comparator = comparator;
-    this.cachedBytes = 0;
-    this.diskSpiller =
-        new DiskSpiller(folderPath, folderPath + 
operatorContext.getOperatorId(), dataTypes);
-    this.sortBufferManager = new SortBufferManager();
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return inputOperator.isBlocked();
+    super(operatorContext, inputOperator, dataTypes, folderPath, comparator);
   }
 
   @Override
   public TsBlock next() throws Exception {
     if (!inputOperator.hasNextWithTimer()) {
-      if (diskSpiller.hasSpilledData()) {
-        try {
-          prepareSortReaders();
-          return mergeSort();
-        } catch (Exception e) {
-          clear();
-          throw e;
-        }
-      } else {
-        if (curRow == -1) {
-          long startTime = System.nanoTime();
-          cachedData.sort(comparator);
-          sortCost += System.nanoTime() - startTime;
-          curRow = 0;
-        }
-        return buildTsBlockInMemory();
-      }
+      buildResult();
+      TsBlock res = tsBlockBuilder.build();
+      tsBlockBuilder.reset();
+      return res;
     }
     long startTime = System.nanoTime();
     try {
@@ -150,225 +72,6 @@ public class SortOperator implements ProcessOperator {
     return null;
   }
 
-  private void recordMetrics() {
-    operatorContext.recordSpecifiedInfo("prepareCost/ns", 
Long.toString(prepareUntilReadyCost));
-    operatorContext.recordSpecifiedInfo("sortedDataSize", 
Long.toString(dataSize));
-    operatorContext.recordSpecifiedInfo("sortCost/ns", 
Long.toString(sortCost));
-    int spilledFileSize = diskSpiller.getFileSize();
-    if (spilledFileSize > 0) {
-      operatorContext.recordSpecifiedInfo(
-          "merge sort branch", Integer.toString(diskSpiller.getFileSize() + 
1));
-    }
-  }
-
-  private void prepareSortReaders() throws IoTDBException {
-    if (sortReaders != null) {
-      return;
-    }
-    sortReaders = new ArrayList<>();
-    if (cachedBytes != 0) {
-      cachedData.sort(comparator);
-      if (sortBufferManager.allocate(cachedBytes)) {
-        sortReaders.add(
-            new MemoryReader(
-                
cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList())));
-      } else {
-        sortBufferManager.allocateOneSortBranch();
-        diskSpiller.spillSortedData(cachedData);
-        cachedData = null;
-      }
-    }
-    sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
-    // if reader is finished
-    noMoreData = new boolean[sortReaders.size()];
-  }
-
-  private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
-    long bytesSize = tsBlock.getRetainedSizeInBytes();
-    if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
-      cachedBytes += bytesSize;
-      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-        cachedData.add(new MergeSortKey(tsBlock, i));
-      }
-    } else {
-      cachedData.sort(comparator);
-      spill();
-      cachedData.clear();
-      cachedBytes = bytesSize;
-      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-        cachedData.add(new MergeSortKey(tsBlock, i));
-      }
-    }
-  }
-
-  private void spill() throws IoTDBException {
-    // if current memory cannot put this tsBlock, an exception will be thrown 
in spillSortedData()
-    // because there should be at least 
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
-    // one branch.
-    sortBufferManager.allocateOneSortBranch();
-    diskSpiller.spillSortedData(cachedData);
-  }
-
-  private TsBlock buildTsBlockInMemory() {
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    for (int i = curRow; i < cachedData.size(); i++) {
-      SortKey sortKey = cachedData.get(i);
-      TsBlock tsBlock = sortKey.tsBlock;
-      timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex));
-      for (int j = 0; j < valueColumnBuilders.length; j++) {
-        if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) {
-          valueColumnBuilders[j].appendNull();
-          continue;
-        }
-        valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex);
-      }
-      tsBlockBuilder.declarePosition();
-      curRow++;
-      if (tsBlockBuilder.isFull()) {
-        break;
-      }
-    }
-
-    return tsBlockBuilder.build();
-  }
-
-  private TsBlock mergeSort() throws IoTDBException {
-
-    // 1. fill the input from each reader
-    initMergeSortHeap();
-
-    long startTime = System.nanoTime();
-    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-
-    // 2. do merge sort until one TsBlock is consumed up
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    while (!mergeSortHeap.isEmpty()) {
-
-      MergeSortKey mergeSortKey = mergeSortHeap.poll();
-      TsBlock targetBlock = mergeSortKey.tsBlock;
-      timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
-      for (int i = 0; i < valueColumnBuilders.length; i++) {
-        if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
-          valueColumnBuilders[i].appendNull();
-          continue;
-        }
-        valueColumnBuilders[i].write(targetBlock.getColumn(i), 
mergeSortKey.rowIndex);
-      }
-      tsBlockBuilder.declarePosition();
-
-      int readerIndex = mergeSortKey.inputChannelIndex;
-      mergeSortKey = readNextMergeSortKey(readerIndex);
-      if (mergeSortKey != null) {
-        mergeSortHeap.push(mergeSortKey);
-      } else {
-        noMoreData[readerIndex] = true;
-        sortBufferManager.releaseOneSortBranch();
-      }
-
-      // break if time is out or tsBlockBuilder is full or sortBuffer is not 
enough
-      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
-        break;
-      }
-    }
-    sortCost += System.nanoTime() - startTime;
-    return tsBlockBuilder.build();
-  }
-
-  private void initMergeSortHeap() throws IoTDBException {
-    if (mergeSortHeap == null) {
-      mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
-      for (int i = 0; i < sortReaders.size(); i++) {
-        SortReader sortReader = sortReaders.get(i);
-        if (sortReader.hasNext()) {
-          MergeSortKey mergeSortKey = sortReader.next();
-          mergeSortKey.inputChannelIndex = i;
-          mergeSortHeap.push(mergeSortKey);
-        } else {
-          noMoreData[i] = true;
-          sortBufferManager.releaseOneSortBranch();
-        }
-      }
-    }
-  }
-
-  private MergeSortKey readNextMergeSortKey(int readerIndex) throws 
IoTDBException {
-    SortReader sortReader = sortReaders.get(readerIndex);
-    if (sortReader.hasNext()) {
-      MergeSortKey mergeSortKey = sortReader.next();
-      mergeSortKey.inputChannelIndex = readerIndex;
-      return mergeSortKey;
-    }
-    return null;
-  }
-
-  private boolean hasMoreData() {
-    if (noMoreData == null) {
-      return true;
-    }
-    for (boolean noMore : noMoreData) {
-      if (!noMore) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void clear() {
-    if (!diskSpiller.hasSpilledData()) {
-      return;
-    }
-    try {
-      if (sortReaders != null) {
-        for (SortReader sortReader : sortReaders) {
-          sortReader.close();
-        }
-      }
-    } catch (Exception e) {
-      logger.error("Fail to close fileChannel", e);
-    }
-  }
-
-  @Override
-  public boolean hasNext() throws Exception {
-    return inputOperator.hasNextWithTimer()
-        || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
-        || (diskSpiller.hasSpilledData() && hasMoreData());
-  }
-
-  @Override
-  public void close() throws Exception {
-    recordMetrics();
-    cachedData = null;
-    clear();
-    inputOperator.close();
-  }
-
-  @Override
-  public boolean isFinished() throws Exception {
-    return !this.hasNextWithTimer();
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return inputOperator.calculateMaxPeekMemoryWithCounter()
-        + inputOperator.calculateRetainedSizeAfterCallingNext()
-        + SORT_BUFFER_SIZE;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
-  }
-
   @Override
   public long ramBytesUsed() {
     return INSTANCE_SIZE
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
new file mode 100644
index 00000000000..d53a3480f93
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class StreamSortOperator extends AbstractSortOperator {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(StreamSortOperator.class);
+
+  private final Comparator<SortKey> streamSortComparator;
+
+  private final int minLinesToOutput;
+
+  // accumulated unprinted line count
+  private long remainingCount = 0;
+
+  // child operator has no more data
+  private boolean noMoreDataFromChild = false;
+
+  // unprocessed data from the previous iteration
+  private TsBlock currentTsBlock = null;
+
+  // be able to start stream outputting partial data
+  private boolean canStreamOutput = false;
+
+  private SortKey lastRow = null;
+
+  public StreamSortOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> dataTypes,
+      String folderPath,
+      Comparator<SortKey> comparator,
+      Comparator<SortKey> streamSortComparator,
+      int minLinesToOutput) {
+    super(operatorContext, inputOperator, dataTypes, folderPath, comparator);
+    this.streamSortComparator = streamSortComparator;
+    this.minLinesToOutput = minLinesToOutput;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (canStreamOutput || !tsBlockBuilder.isEmpty()) {
+
+      buildResult();
+
+      // this time stream output data has been consumed up
+      if (!hasMoreSortedData()) {
+        canStreamOutput = false;
+        // clear spilled disk space and memory data structure
+        resetSortRelatedResource();
+      }
+
+      if (tsBlockBuilder.isFull() || consumedUp()) {
+        TsBlock res = tsBlockBuilder.build();
+        remainingCount -= res.getPositionCount();
+        tsBlockBuilder.reset();
+        return res;
+      }
+    }
+
+    if (currentTsBlock != null) {
+      cacheTsBlock(currentTsBlock);
+      currentTsBlock = null;
+    }
+
+    long startTime = System.nanoTime();
+    if (!inputOperator.hasNextWithTimer()) {
+      noMoreDataFromChild = true;
+      canStreamOutput = true;
+    } else {
+      try {
+        // init currentTsBlock from child operator
+        currentTsBlock = inputOperator.nextWithTimer();
+        if (currentTsBlock == null || currentTsBlock.isEmpty()) {
+          currentTsBlock = null;
+          return null;
+        }
+        // record total sorted data size
+        dataSize += currentTsBlock.getRetainedSizeInBytes();
+
+        // if currentTsBlock line count + remainingCount is still less than 
minLinesToOutput, just
+        // cache it
+        if (currentTsBlock.getPositionCount() + remainingCount < 
minLinesToOutput) {
+          cacheTsBlock(currentTsBlock);
+          remainingCount += currentTsBlock.getPositionCount();
+          currentTsBlock = null;
+        } else {
+          // if stream compare key of the last row of currentTsBlock is same 
as the last row of
+          // previous TsBlock, we cannot stream output, just cache it
+          if (isStreamCompareKeySame()) {
+            cacheTsBlock(currentTsBlock);
+            remainingCount += currentTsBlock.getPositionCount();
+            currentTsBlock = null;
+          } else {
+            // traverse the current TsBlock backward until encountering a row 
with a StreamCompKey
+            // different from the last row, and return the index of that row
+            int endIndex = getEndIndexFromCurrentTsBlock();
+            // if total count of `canOutput` lines is less than 
minLinesToOutput, we won't output
+            // them, we just cache the whole currentTsBlock
+            if (endIndex == -1 || endIndex + remainingCount + 1 < 
minLinesToOutput) {
+              cacheTsBlock(currentTsBlock);
+              remainingCount += currentTsBlock.getPositionCount();
+              currentTsBlock = null;
+            } else {
+              // `canOutput` lines count is larger than minLinesToOutput, so 
we can stream output
+              cacheTsBlock(currentTsBlock.getRegion(0, endIndex + 1));
+              remainingCount += currentTsBlock.getPositionCount();
+              canStreamOutput = true;
+              currentTsBlock = currentTsBlock.subTsBlock(endIndex + 1);
+            }
+          }
+        }
+      } catch (IoTDBException e) {
+        clear();
+        throw e;
+      } finally {
+        prepareUntilReadyCost += System.nanoTime() - startTime;
+      }
+    }
+    return null;
+  }
+
+  private boolean consumedUp() {
+    return remainingCount == tsBlockBuilder.getPositionCount() && 
noMoreDataFromChild;
+  }
+
+  // return -1, if not found
+  private int getEndIndexFromCurrentTsBlock() {
+    SortKey sortKeyOfLastRow = new SortKey(currentTsBlock, 
currentTsBlock.getPositionCount() - 1);
+    SortKey sortKeyOfFoundRow = new SortKey(currentTsBlock, 
currentTsBlock.getPositionCount() - 2);
+    for (int index = currentTsBlock.getPositionCount() - 2; index >= 0; 
index--) {
+      sortKeyOfFoundRow.rowIndex = index;
+      if (streamSortComparator.compare(sortKeyOfFoundRow, sortKeyOfLastRow) != 
0) {
+        return index;
+      }
+    }
+    return -1;
+  }
+
+  private boolean isStreamCompareKeySame() {
+    return lastRow == null
+        || streamSortComparator.compare(
+                lastRow, new SortKey(currentTsBlock, 
currentTsBlock.getPositionCount() - 1))
+            == 0;
+  }
+
+  protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+    super.cacheTsBlock(tsBlock);
+    lastRow = new SortKey(tsBlock, tsBlock.getPositionCount() - 1);
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return super.hasNext() || !tsBlockBuilder.isEmpty() || currentTsBlock != 
null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    lastRow = null;
+    currentTsBlock = null;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + RamUsageEstimator.sizeOf(noMoreData)
+        + tsBlockBuilder.getRetainedSizeInBytes();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 73b333a8c86..d7d62ec2e86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
@@ -63,6 +64,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Expression;
@@ -240,6 +242,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     SeriesScanOptions.Builder scanOptionsBuilder = 
getSeriesScanOptionsBuilder(context);
     scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
     scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
+    
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
     scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
 
     Expression pushDownPredicate = node.getPushDownPredicate();
@@ -711,4 +714,54 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
               }
             });
   }
+
+  @Override
+  public Operator visitStreamSort(StreamSortNode node, 
LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                StreamSortNode.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount);
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount);
+    genSortInformation(
+        node.getOutputSymbols(),
+        node.getOrderingScheme(),
+        sortItemIndexList,
+        sortItemDataTypeList,
+        context.getTypeProvider());
+
+    String filePrefix =
+        IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+            + File.separator
+            + 
operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId()
+            + File.separator
+            + operatorContext.getDriverContext().getPipelineId()
+            + File.separator;
+
+    context.getDriverContext().setHaveTmpFile(true);
+    
context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true);
+
+    Operator child = node.getChild().accept(this, context);
+
+    return new StreamSortOperator(
+        operatorContext,
+        child,
+        dataTypes,
+        filePrefix,
+        getComparatorForTable(
+            node.getOrderingScheme().getOrderingList(), sortItemIndexList, 
sortItemDataTypeList),
+        getComparatorForTable(
+            node.getOrderingScheme()
+                .getOrderingList()
+                .subList(0, node.getStreamCompareKeyEndIndex() + 1),
+            sortItemIndexList.subList(0, node.getStreamCompareKeyEndIndex() + 
1),
+            sortItemDataTypeList.subList(0, node.getStreamCompareKeyEndIndex() 
+ 1)),
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index c39e4f6ea18..1bd6892541c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -37,24 +37,29 @@ public class SeriesScanOptions {
 
   private Filter globalTimeFilter;
 
-  private Filter pushDownFilter;
+  private final Filter pushDownFilter;
 
   private final long pushDownLimit;
   private final long pushDownOffset;
 
   private final Set<String> allSensors;
 
+  private final boolean pushLimitToEachDevice;
+  private PaginationController paginationController;
+
   public SeriesScanOptions(
       Filter globalTimeFilter,
       Filter pushDownFilter,
       long pushDownLimit,
       long pushDownOffset,
-      Set<String> allSensors) {
+      Set<String> allSensors,
+      boolean pushLimitToEachDevice) {
     this.globalTimeFilter = globalTimeFilter;
     this.pushDownFilter = pushDownFilter;
     this.pushDownLimit = pushDownLimit;
     this.pushDownOffset = pushDownOffset;
     this.allSensors = allSensors;
+    this.pushLimitToEachDevice = pushLimitToEachDevice;
   }
 
   public static SeriesScanOptions getDefaultSeriesScanOptions(IFullPath 
seriesPath) {
@@ -82,7 +87,14 @@ public class SeriesScanOptions {
   }
 
   public PaginationController getPaginationController() {
-    return new PaginationController(pushDownLimit, pushDownOffset);
+    if (pushLimitToEachDevice) {
+      return new PaginationController(pushDownLimit, pushDownOffset);
+    } else {
+      if (paginationController == null) {
+        paginationController = new PaginationController(pushDownLimit, 
pushDownOffset);
+      }
+      return paginationController;
+    }
   }
 
   public void setTTL(long dataTTL) {
@@ -114,6 +126,8 @@ public class SeriesScanOptions {
 
     private Set<String> allSensors;
 
+    private boolean pushLimitToEachDevice = true;
+
     public Builder withGlobalTimeFilter(Filter globalTimeFilter) {
       this.globalTimeFilter = globalTimeFilter;
       return this;
@@ -134,13 +148,23 @@ public class SeriesScanOptions {
       return this;
     }
 
+    public Builder withPushLimitToEachDevice(boolean pushLimitToEachDevice) {
+      this.pushLimitToEachDevice = pushLimitToEachDevice;
+      return this;
+    }
+
     public void withAllSensors(Set<String> allSensors) {
       this.allSensors = allSensors;
     }
 
     public SeriesScanOptions build() {
       return new SeriesScanOptions(
-          globalTimeFilter, pushDownFilter, pushDownLimit, pushDownOffset, 
allSensors);
+          globalTimeFilter,
+          pushDownFilter,
+          pushDownLimit,
+          pushDownOffset,
+          allSensors,
+          pushLimitToEachDevice);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
index c27b176bd9f..9a52f7464f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
@@ -168,4 +168,8 @@ public class DiskSpiller {
   public int getFileSize() {
     return fileIndex;
   }
+
+  public void reset() {
+    fileIndex = 0;
+  }
 }


Reply via email to