This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new 411255134b1 Add support for StreamSortOperator
411255134b1 is described below

commit 411255134b116640412bbac41a0b7e1aa2ef0c79
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Jun 7 17:32:02 2024 +0800

    Add support for StreamSortOperator
---
 ...SortOperator.java => AbstractSortOperator.java} | 146 +++--
 .../execution/operator/process/SortOperator.java   | 309 +----------
 .../operator/process/StreamSortOperator.java       | 203 +++++++
 .../plan/planner/TableOperatorGenerator.java       |  53 ++
 .../planner/plan/parameter/SeriesScanOptions.java  |  32 +-
 .../apache/iotdb/db/utils/sort/DiskSpiller.java    |  17 +-
 .../iotdb/db/utils/sort/SortBufferManager.java     |  33 +-
 .../execution/operator/SortOperatorTest.java       |  47 +-
 .../operator/process/StreamSortOperatorTest.java   | 591 +++++++++++++++++++++
 .../apache/iotdb/db/utils/sort/SortUtilTest.java   |   6 +-
 10 files changed, 1009 insertions(+), 428 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
similarity index 77%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index e34112c2903..ca152e6d77b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -17,14 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.operator.process.relational;
+package org.apache.iotdb.db.queryengine.execution.operator.process;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
-import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
 import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -40,7 +38,6 @@ import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
-import org.apache.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,17 +47,13 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
+public abstract class AbstractSortOperator implements ProcessOperator {
 
-public class StreamSortOperator implements ProcessOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSortOperator.class);
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamSortOperator.class);
-
-  private static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
-  private final OperatorContext operatorContext;
-  private final Operator inputOperator;
-  private final TsBlockBuilder tsBlockBuilder;
+  protected final OperatorContext operatorContext;
+  protected final Operator inputOperator;
+  protected final TsBlockBuilder tsBlockBuilder;
 
   // Use to output the result in memory.
   // Because the memory may be larger than tsBlockBuilder's max size
@@ -71,22 +64,22 @@ public class StreamSortOperator implements ProcessOperator {
   private final Comparator<SortKey> comparator;
   private long cachedBytes;
   private final DiskSpiller diskSpiller;
-  private final SortBufferManager sortBufferManager;
+  private SortBufferManager sortBufferManager;
 
   // For mergeSort
 
   private MergeSortHeap mergeSortHeap;
   private List<SortReader> sortReaders;
-  private boolean[] noMoreData;
+  protected boolean[] noMoreData;
 
   private final int maxReturnSize =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
-  private long prepareUntilReadyCost = 0;
-  private long dataSize = 0;
+  protected long prepareUntilReadyCost = 0;
+  protected long dataSize = 0;
   private long sortCost = 0;
 
-  public StreamSortOperator(
+  AbstractSortOperator(
       OperatorContext operatorContext,
       Operator inputOperator,
       List<TSDataType> dataTypes,
@@ -100,7 +93,30 @@ public class StreamSortOperator implements ProcessOperator {
     this.cachedBytes = 0;
     this.diskSpiller =
         new DiskSpiller(folderPath, folderPath + 
operatorContext.getOperatorId(), dataTypes);
-    this.sortBufferManager = new SortBufferManager();
+    this.sortBufferManager =
+        new SortBufferManager(
+            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+            IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
+  }
+
+  protected void buildResult() throws IoTDBException {
+    if (diskSpiller.hasSpilledData()) {
+      try {
+        prepareSortReaders();
+        mergeSort();
+      } catch (Exception e) {
+        clear();
+        throw e;
+      }
+    } else {
+      if (curRow == -1) {
+        long startTime = System.nanoTime();
+        cachedData.sort(comparator);
+        sortCost += System.nanoTime() - startTime;
+        curRow = 0;
+      }
+      buildTsBlockInMemory();
+    }
   }
 
   @Override
@@ -113,45 +129,6 @@ public class StreamSortOperator implements ProcessOperator 
{
     return inputOperator.isBlocked();
   }
 
-  @Override
-  public TsBlock next() throws Exception {
-    if (!inputOperator.hasNextWithTimer()) {
-      if (diskSpiller.hasSpilledData()) {
-        try {
-          prepareSortReaders();
-          return mergeSort();
-        } catch (Exception e) {
-          clear();
-          throw e;
-        }
-      } else {
-        if (curRow == -1) {
-          long startTime = System.nanoTime();
-          cachedData.sort(comparator);
-          sortCost += System.nanoTime() - startTime;
-          curRow = 0;
-        }
-        return buildTsBlockInMemory();
-      }
-    }
-    long startTime = System.nanoTime();
-    try {
-      TsBlock tsBlock = inputOperator.nextWithTimer();
-      if (tsBlock == null) {
-        return null;
-      }
-      dataSize += tsBlock.getRetainedSizeInBytes();
-      cacheTsBlock(tsBlock);
-    } catch (IoTDBException e) {
-      clear();
-      throw e;
-    } finally {
-      prepareUntilReadyCost += System.nanoTime() - startTime;
-    }
-
-    return null;
-  }
-
   private void recordMetrics() {
     operatorContext.recordSpecifiedInfo("prepareCost/ns", 
Long.toString(prepareUntilReadyCost));
     operatorContext.recordSpecifiedInfo("sortedDataSize", 
Long.toString(dataSize));
@@ -185,9 +162,9 @@ public class StreamSortOperator implements ProcessOperator {
     noMoreData = new boolean[sortReaders.size()];
   }
 
-  private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+  protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
     long bytesSize = tsBlock.getRetainedSizeInBytes();
-    if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
+    if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) {
       cachedBytes += bytesSize;
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
         cachedData.add(new MergeSortKey(tsBlock, i));
@@ -211,8 +188,7 @@ public class StreamSortOperator implements ProcessOperator {
     diskSpiller.spillSortedData(cachedData);
   }
 
-  private TsBlock buildTsBlockInMemory() {
-    tsBlockBuilder.reset();
+  private void buildTsBlockInMemory() {
     TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
     for (int i = curRow; i < cachedData.size(); i++) {
@@ -232,11 +208,9 @@ public class StreamSortOperator implements ProcessOperator 
{
         break;
       }
     }
-
-    return tsBlockBuilder.build();
   }
 
-  private TsBlock mergeSort() throws IoTDBException {
+  private void mergeSort() throws IoTDBException {
 
     // 1. fill the input from each reader
     initMergeSortHeap();
@@ -245,7 +219,6 @@ public class StreamSortOperator implements ProcessOperator {
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 
     // 2. do merge sort until one TsBlock is consumed up
-    tsBlockBuilder.reset();
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
     while (!mergeSortHeap.isEmpty()) {
@@ -277,7 +250,6 @@ public class StreamSortOperator implements ProcessOperator {
       }
     }
     sortCost += System.nanoTime() - startTime;
-    return tsBlockBuilder.build();
   }
 
   private void initMergeSortHeap() throws IoTDBException {
@@ -329,15 +301,22 @@ public class StreamSortOperator implements 
ProcessOperator {
           sortReader.close();
         }
       }
+      sortReaders = null;
+      diskSpiller.reset();
     } catch (Exception e) {
-      LOGGER.error("Fail to close fileChannel", e);
+      LOGGER.warn("Fail to close fileChannel", e);
     }
   }
 
   @Override
   public boolean hasNext() throws Exception {
-    return inputOperator.hasNextWithTimer()
-        || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+    return inputOperator.hasNextWithTimer() || hasMoreSortedData();
+  }
+
+  protected boolean hasMoreSortedData() {
+    return (!diskSpiller.hasSpilledData()
+            && ((curRow == -1 && !cachedData.isEmpty())
+                || (curRow != -1 && curRow != cachedData.size())))
         || (diskSpiller.hasSpilledData() && hasMoreData());
   }
 
@@ -358,7 +337,7 @@ public class StreamSortOperator implements ProcessOperator {
   public long calculateMaxPeekMemory() {
     return inputOperator.calculateMaxPeekMemoryWithCounter()
         + inputOperator.calculateRetainedSizeAfterCallingNext()
-        + SORT_BUFFER_SIZE;
+        + sortBufferManager.getSortBufferSize();
   }
 
   @Override
@@ -368,15 +347,22 @@ public class StreamSortOperator implements 
ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
+    return inputOperator.calculateRetainedSizeAfterCallingNext()
+        + sortBufferManager.getSortBufferSize();
   }
 
-  @Override
-  public long ramBytesUsed() {
-    return INSTANCE_SIZE
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
-        + RamUsageEstimator.sizeOf(noMoreData)
-        + tsBlockBuilder.getRetainedSizeInBytes();
+  protected void resetSortRelatedResource() {
+    curRow = -1;
+    cachedData = new ArrayList<>();
+    cachedBytes = 0;
+    clear();
+    sortBufferManager =
+        new SortBufferManager(
+            sortBufferManager.getMaxTsBlockSizeInBytes(), 
sortBufferManager.getSortBufferSize());
+    if (mergeSortHeap != null && !mergeSortHeap.isEmpty()) {
+      throw new IllegalStateException("mergeSortHeap should be empty!");
+    }
+    mergeSortHeap = null;
+    noMoreData = null;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
index 55453353f0d..0f9ca2bd848 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
@@ -23,66 +23,19 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
-import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
-import org.apache.iotdb.db.utils.sort.DiskSpiller;
-import org.apache.iotdb.db.utils.sort.MemoryReader;
-import org.apache.iotdb.db.utils.sort.SortBufferManager;
-import org.apache.iotdb.db.utils.sort.SortReader;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.RamUsageEstimator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
-
-public class SortOperator implements ProcessOperator {
+public class SortOperator extends AbstractSortOperator {
 
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
-  private final OperatorContext operatorContext;
-  private final Operator inputOperator;
-  private final TsBlockBuilder tsBlockBuilder;
-
-  // Use to output the result in memory.
-  // Because the memory may be larger than tsBlockBuilder's max size
-  // so the data may be return in multiple times.
-  private int curRow = -1;
-
-  private List<SortKey> cachedData;
-  private final Comparator<SortKey> comparator;
-  private long cachedBytes;
-  private final DiskSpiller diskSpiller;
-  private final SortBufferManager sortBufferManager;
-
-  // For mergeSort
-
-  private MergeSortHeap mergeSortHeap;
-  private List<SortReader> sortReaders;
-  private boolean[] noMoreData;
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SortOperator.class);
-
-  private final int maxReturnSize =
-      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
-  private long prepareUntilReadyCost = 0;
-  private long dataSize = 0;
-  private long sortCost = 0;
 
   public SortOperator(
       OperatorContext operatorContext,
@@ -90,47 +43,16 @@ public class SortOperator implements ProcessOperator {
       List<TSDataType> dataTypes,
       String folderPath,
       Comparator<SortKey> comparator) {
-    this.operatorContext = operatorContext;
-    this.inputOperator = inputOperator;
-    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.cachedData = new ArrayList<>();
-    this.comparator = comparator;
-    this.cachedBytes = 0;
-    this.diskSpiller =
-        new DiskSpiller(folderPath, folderPath + 
operatorContext.getOperatorId(), dataTypes);
-    this.sortBufferManager = new SortBufferManager();
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return inputOperator.isBlocked();
+    super(operatorContext, inputOperator, dataTypes, folderPath, comparator);
   }
 
   @Override
   public TsBlock next() throws Exception {
     if (!inputOperator.hasNextWithTimer()) {
-      if (diskSpiller.hasSpilledData()) {
-        try {
-          prepareSortReaders();
-          return mergeSort();
-        } catch (Exception e) {
-          clear();
-          throw e;
-        }
-      } else {
-        if (curRow == -1) {
-          long startTime = System.nanoTime();
-          cachedData.sort(comparator);
-          sortCost += System.nanoTime() - startTime;
-          curRow = 0;
-        }
-        return buildTsBlockInMemory();
-      }
+      buildResult();
+      TsBlock res = tsBlockBuilder.build();
+      tsBlockBuilder.reset();
+      return res;
     }
     long startTime = System.nanoTime();
     try {
@@ -150,225 +72,6 @@ public class SortOperator implements ProcessOperator {
     return null;
   }
 
-  private void recordMetrics() {
-    operatorContext.recordSpecifiedInfo("prepareCost/ns", 
Long.toString(prepareUntilReadyCost));
-    operatorContext.recordSpecifiedInfo("sortedDataSize", 
Long.toString(dataSize));
-    operatorContext.recordSpecifiedInfo("sortCost/ns", 
Long.toString(sortCost));
-    int spilledFileSize = diskSpiller.getFileSize();
-    if (spilledFileSize > 0) {
-      operatorContext.recordSpecifiedInfo(
-          "merge sort branch", Integer.toString(diskSpiller.getFileSize() + 
1));
-    }
-  }
-
-  private void prepareSortReaders() throws IoTDBException {
-    if (sortReaders != null) {
-      return;
-    }
-    sortReaders = new ArrayList<>();
-    if (cachedBytes != 0) {
-      cachedData.sort(comparator);
-      if (sortBufferManager.allocate(cachedBytes)) {
-        sortReaders.add(
-            new MemoryReader(
-                
cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList())));
-      } else {
-        sortBufferManager.allocateOneSortBranch();
-        diskSpiller.spillSortedData(cachedData);
-        cachedData = null;
-      }
-    }
-    sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
-    // if reader is finished
-    noMoreData = new boolean[sortReaders.size()];
-  }
-
-  private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
-    long bytesSize = tsBlock.getRetainedSizeInBytes();
-    if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
-      cachedBytes += bytesSize;
-      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-        cachedData.add(new MergeSortKey(tsBlock, i));
-      }
-    } else {
-      cachedData.sort(comparator);
-      spill();
-      cachedData.clear();
-      cachedBytes = bytesSize;
-      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-        cachedData.add(new MergeSortKey(tsBlock, i));
-      }
-    }
-  }
-
-  private void spill() throws IoTDBException {
-    // if current memory cannot put this tsBlock, an exception will be thrown 
in spillSortedData()
-    // because there should be at least 
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
-    // one branch.
-    sortBufferManager.allocateOneSortBranch();
-    diskSpiller.spillSortedData(cachedData);
-  }
-
-  private TsBlock buildTsBlockInMemory() {
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    for (int i = curRow; i < cachedData.size(); i++) {
-      SortKey sortKey = cachedData.get(i);
-      TsBlock tsBlock = sortKey.tsBlock;
-      timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex));
-      for (int j = 0; j < valueColumnBuilders.length; j++) {
-        if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) {
-          valueColumnBuilders[j].appendNull();
-          continue;
-        }
-        valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex);
-      }
-      tsBlockBuilder.declarePosition();
-      curRow++;
-      if (tsBlockBuilder.isFull()) {
-        break;
-      }
-    }
-
-    return tsBlockBuilder.build();
-  }
-
-  private TsBlock mergeSort() throws IoTDBException {
-
-    // 1. fill the input from each reader
-    initMergeSortHeap();
-
-    long startTime = System.nanoTime();
-    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-
-    // 2. do merge sort until one TsBlock is consumed up
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    while (!mergeSortHeap.isEmpty()) {
-
-      MergeSortKey mergeSortKey = mergeSortHeap.poll();
-      TsBlock targetBlock = mergeSortKey.tsBlock;
-      timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
-      for (int i = 0; i < valueColumnBuilders.length; i++) {
-        if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
-          valueColumnBuilders[i].appendNull();
-          continue;
-        }
-        valueColumnBuilders[i].write(targetBlock.getColumn(i), 
mergeSortKey.rowIndex);
-      }
-      tsBlockBuilder.declarePosition();
-
-      int readerIndex = mergeSortKey.inputChannelIndex;
-      mergeSortKey = readNextMergeSortKey(readerIndex);
-      if (mergeSortKey != null) {
-        mergeSortHeap.push(mergeSortKey);
-      } else {
-        noMoreData[readerIndex] = true;
-        sortBufferManager.releaseOneSortBranch();
-      }
-
-      // break if time is out or tsBlockBuilder is full or sortBuffer is not 
enough
-      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
-        break;
-      }
-    }
-    sortCost += System.nanoTime() - startTime;
-    return tsBlockBuilder.build();
-  }
-
-  private void initMergeSortHeap() throws IoTDBException {
-    if (mergeSortHeap == null) {
-      mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
-      for (int i = 0; i < sortReaders.size(); i++) {
-        SortReader sortReader = sortReaders.get(i);
-        if (sortReader.hasNext()) {
-          MergeSortKey mergeSortKey = sortReader.next();
-          mergeSortKey.inputChannelIndex = i;
-          mergeSortHeap.push(mergeSortKey);
-        } else {
-          noMoreData[i] = true;
-          sortBufferManager.releaseOneSortBranch();
-        }
-      }
-    }
-  }
-
-  private MergeSortKey readNextMergeSortKey(int readerIndex) throws 
IoTDBException {
-    SortReader sortReader = sortReaders.get(readerIndex);
-    if (sortReader.hasNext()) {
-      MergeSortKey mergeSortKey = sortReader.next();
-      mergeSortKey.inputChannelIndex = readerIndex;
-      return mergeSortKey;
-    }
-    return null;
-  }
-
-  private boolean hasMoreData() {
-    if (noMoreData == null) {
-      return true;
-    }
-    for (boolean noMore : noMoreData) {
-      if (!noMore) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void clear() {
-    if (!diskSpiller.hasSpilledData()) {
-      return;
-    }
-    try {
-      if (sortReaders != null) {
-        for (SortReader sortReader : sortReaders) {
-          sortReader.close();
-        }
-      }
-    } catch (Exception e) {
-      logger.error("Fail to close fileChannel", e);
-    }
-  }
-
-  @Override
-  public boolean hasNext() throws Exception {
-    return inputOperator.hasNextWithTimer()
-        || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
-        || (diskSpiller.hasSpilledData() && hasMoreData());
-  }
-
-  @Override
-  public void close() throws Exception {
-    recordMetrics();
-    cachedData = null;
-    clear();
-    inputOperator.close();
-  }
-
-  @Override
-  public boolean isFinished() throws Exception {
-    return !this.hasNextWithTimer();
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return inputOperator.calculateMaxPeekMemoryWithCounter()
-        + inputOperator.calculateRetainedSizeAfterCallingNext()
-        + SORT_BUFFER_SIZE;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
-  }
-
   @Override
   public long ramBytesUsed() {
     return INSTANCE_SIZE
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
new file mode 100644
index 00000000000..d53a3480f93
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class StreamSortOperator extends AbstractSortOperator {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(StreamSortOperator.class);
+
+  private final Comparator<SortKey> streamSortComparator;
+
+  private final int minLinesToOutput;
+
+  // accumulated unprinted line count
+  private long remainingCount = 0;
+
+  // child operator has no more data
+  private boolean noMoreDataFromChild = false;
+
+  // unprocessed data from the previous iteration
+  private TsBlock currentTsBlock = null;
+
+  // be able to start stream outputting partial data
+  private boolean canStreamOutput = false;
+
+  private SortKey lastRow = null;
+
+  public StreamSortOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> dataTypes,
+      String folderPath,
+      Comparator<SortKey> comparator,
+      Comparator<SortKey> streamSortComparator,
+      int minLinesToOutput) {
+    super(operatorContext, inputOperator, dataTypes, folderPath, comparator);
+    this.streamSortComparator = streamSortComparator;
+    this.minLinesToOutput = minLinesToOutput;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (canStreamOutput || !tsBlockBuilder.isEmpty()) {
+
+      buildResult();
+
+      // this time stream output data has been consumed up
+      if (!hasMoreSortedData()) {
+        canStreamOutput = false;
+        // clear spilled disk space and memory data structure
+        resetSortRelatedResource();
+      }
+
+      if (tsBlockBuilder.isFull() || consumedUp()) {
+        TsBlock res = tsBlockBuilder.build();
+        remainingCount -= res.getPositionCount();
+        tsBlockBuilder.reset();
+        return res;
+      }
+    }
+
+    if (currentTsBlock != null) {
+      cacheTsBlock(currentTsBlock);
+      currentTsBlock = null;
+    }
+
+    long startTime = System.nanoTime();
+    if (!inputOperator.hasNextWithTimer()) {
+      noMoreDataFromChild = true;
+      canStreamOutput = true;
+    } else {
+      try {
+        // init currentTsBlock from child operator
+        currentTsBlock = inputOperator.nextWithTimer();
+        if (currentTsBlock == null || currentTsBlock.isEmpty()) {
+          currentTsBlock = null;
+          return null;
+        }
+        // record total sorted data size
+        dataSize += currentTsBlock.getRetainedSizeInBytes();
+
+        // if currentTsBlock line count + remainingCount is still less than 
minLinesToOutput, just
+        // cache it
+        if (currentTsBlock.getPositionCount() + remainingCount < 
minLinesToOutput) {
+          cacheTsBlock(currentTsBlock);
+          remainingCount += currentTsBlock.getPositionCount();
+          currentTsBlock = null;
+        } else {
+          // if stream compare key of the last row of currentTsBlock is same 
as the last row of
+          // previous TsBlock, we cannot stream output, just cache it
+          if (isStreamCompareKeySame()) {
+            cacheTsBlock(currentTsBlock);
+            remainingCount += currentTsBlock.getPositionCount();
+            currentTsBlock = null;
+          } else {
+            // traverse the current TsBlock backward until encountering a row 
with a StreamCompKey
+            // different from the last row, and return the index of that row
+            int endIndex = getEndIndexFromCurrentTsBlock();
+            // if total count of `canOutput` lines is less than 
minLinesToOutput, we won't output
+            // them, we just cache the whole currentTsBlock
+            if (endIndex == -1 || endIndex + remainingCount + 1 < 
minLinesToOutput) {
+              cacheTsBlock(currentTsBlock);
+              remainingCount += currentTsBlock.getPositionCount();
+              currentTsBlock = null;
+            } else {
+              // `canOutput` lines count is larger than minLinesToOutput, so 
we can stream output
+              cacheTsBlock(currentTsBlock.getRegion(0, endIndex + 1));
+              remainingCount += currentTsBlock.getPositionCount();
+              canStreamOutput = true;
+              currentTsBlock = currentTsBlock.subTsBlock(endIndex + 1);
+            }
+          }
+        }
+      } catch (IoTDBException e) {
+        clear();
+        throw e;
+      } finally {
+        prepareUntilReadyCost += System.nanoTime() - startTime;
+      }
+    }
+    return null;
+  }
+
+  private boolean consumedUp() {
+    return remainingCount == tsBlockBuilder.getPositionCount() && 
noMoreDataFromChild;
+  }
+
+  // return -1, if not found
+  private int getEndIndexFromCurrentTsBlock() {
+    SortKey sortKeyOfLastRow = new SortKey(currentTsBlock, 
currentTsBlock.getPositionCount() - 1);
+    SortKey sortKeyOfFoundRow = new SortKey(currentTsBlock, 
currentTsBlock.getPositionCount() - 2);
+    for (int index = currentTsBlock.getPositionCount() - 2; index >= 0; 
index--) {
+      sortKeyOfFoundRow.rowIndex = index;
+      if (streamSortComparator.compare(sortKeyOfFoundRow, sortKeyOfLastRow) != 
0) {
+        return index;
+      }
+    }
+    return -1;
+  }
+
+  private boolean isStreamCompareKeySame() {
+    return lastRow == null
+        || streamSortComparator.compare(
+                lastRow, new SortKey(currentTsBlock, 
currentTsBlock.getPositionCount() - 1))
+            == 0;
+  }
+
+  protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+    super.cacheTsBlock(tsBlock);
+    lastRow = new SortKey(tsBlock, tsBlock.getPositionCount() - 1);
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return super.hasNext() || !tsBlockBuilder.isEmpty() || currentTsBlock != 
null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    lastRow = null;
+    currentTsBlock = null;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + RamUsageEstimator.sizeOf(noMoreData)
+        + tsBlockBuilder.getRetainedSizeInBytes();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 73b333a8c86..d7d62ec2e86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
@@ -63,6 +64,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Expression;
@@ -240,6 +242,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     SeriesScanOptions.Builder scanOptionsBuilder = 
getSeriesScanOptionsBuilder(context);
     scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
     scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
+    
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
     scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
 
     Expression pushDownPredicate = node.getPushDownPredicate();
@@ -711,4 +714,54 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
               }
             });
   }
+
+  @Override
+  public Operator visitStreamSort(StreamSortNode node, 
LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                StreamSortNode.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount);
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount);
+    genSortInformation(
+        node.getOutputSymbols(),
+        node.getOrderingScheme(),
+        sortItemIndexList,
+        sortItemDataTypeList,
+        context.getTypeProvider());
+
+    String filePrefix =
+        IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+            + File.separator
+            + 
operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId()
+            + File.separator
+            + operatorContext.getDriverContext().getPipelineId()
+            + File.separator;
+
+    context.getDriverContext().setHaveTmpFile(true);
+    
context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true);
+
+    Operator child = node.getChild().accept(this, context);
+
+    return new StreamSortOperator(
+        operatorContext,
+        child,
+        dataTypes,
+        filePrefix,
+        getComparatorForTable(
+            node.getOrderingScheme().getOrderingList(), sortItemIndexList, 
sortItemDataTypeList),
+        getComparatorForTable(
+            node.getOrderingScheme()
+                .getOrderingList()
+                .subList(0, node.getStreamCompareKeyEndIndex() + 1),
+            sortItemIndexList.subList(0, node.getStreamCompareKeyEndIndex() + 
1),
+            sortItemDataTypeList.subList(0, node.getStreamCompareKeyEndIndex() 
+ 1)),
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index c39e4f6ea18..1bd6892541c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -37,24 +37,29 @@ public class SeriesScanOptions {
 
   private Filter globalTimeFilter;
 
-  private Filter pushDownFilter;
+  private final Filter pushDownFilter;
 
   private final long pushDownLimit;
   private final long pushDownOffset;
 
   private final Set<String> allSensors;
 
+  private final boolean pushLimitToEachDevice;
+  private PaginationController paginationController;
+
   public SeriesScanOptions(
       Filter globalTimeFilter,
       Filter pushDownFilter,
       long pushDownLimit,
       long pushDownOffset,
-      Set<String> allSensors) {
+      Set<String> allSensors,
+      boolean pushLimitToEachDevice) {
     this.globalTimeFilter = globalTimeFilter;
     this.pushDownFilter = pushDownFilter;
     this.pushDownLimit = pushDownLimit;
     this.pushDownOffset = pushDownOffset;
     this.allSensors = allSensors;
+    this.pushLimitToEachDevice = pushLimitToEachDevice;
   }
 
   public static SeriesScanOptions getDefaultSeriesScanOptions(IFullPath 
seriesPath) {
@@ -82,7 +87,14 @@ public class SeriesScanOptions {
   }
 
   public PaginationController getPaginationController() {
-    return new PaginationController(pushDownLimit, pushDownOffset);
+    if (pushLimitToEachDevice) {
+      return new PaginationController(pushDownLimit, pushDownOffset);
+    } else {
+      if (paginationController == null) {
+        paginationController = new PaginationController(pushDownLimit, 
pushDownOffset);
+      }
+      return paginationController;
+    }
   }
 
   public void setTTL(long dataTTL) {
@@ -114,6 +126,8 @@ public class SeriesScanOptions {
 
     private Set<String> allSensors;
 
+    private boolean pushLimitToEachDevice = true;
+
     public Builder withGlobalTimeFilter(Filter globalTimeFilter) {
       this.globalTimeFilter = globalTimeFilter;
       return this;
@@ -134,13 +148,23 @@ public class SeriesScanOptions {
       return this;
     }
 
+    public Builder withPushLimitToEachDevice(boolean pushLimitToEachDevice) {
+      this.pushLimitToEachDevice = pushLimitToEachDevice;
+      return this;
+    }
+
     public void withAllSensors(Set<String> allSensors) {
       this.allSensors = allSensors;
     }
 
     public SeriesScanOptions build() {
       return new SeriesScanOptions(
-          globalTimeFilter, pushDownFilter, pushDownLimit, pushDownOffset, 
allSensors);
+          globalTimeFilter,
+          pushDownFilter,
+          pushDownLimit,
+          pushDownOffset,
+          allSensors,
+          pushLimitToEachDevice);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
index c27b176bd9f..b0d82725346 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
@@ -104,11 +104,16 @@ public class DiskSpiller {
     }
   }
 
-  private void writeData(List<TsBlock> sortedData, String fileName)
-      throws IOException, IoTDBException {
+  private void writeData(List<TsBlock> sortedData, String fileName) throws 
IoTDBException {
     Path filePath = Paths.get(fileName);
-    Files.createFile(filePath);
-    try (FileChannel fileChannel = FileChannel.open(filePath, 
StandardOpenOption.WRITE)) {
+    // for stream sort we may reuse the previous tmp file name, so we need 
TRUNCATE_EXISTING and
+    // CREATE
+    try (FileChannel fileChannel =
+        FileChannel.open(
+            filePath,
+            StandardOpenOption.WRITE,
+            StandardOpenOption.TRUNCATE_EXISTING,
+            StandardOpenOption.CREATE)) {
       for (TsBlock tsBlock : sortedData) {
         ByteBuffer tsBlockBuffer = serde.serialize(tsBlock);
         ByteBuffer length = ByteBuffer.allocate(4);
@@ -168,4 +173,8 @@ public class DiskSpiller {
   public int getFileSize() {
     return fileIndex;
   }
+
+  public void reset() {
+    fileIndex = 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
index 8f2de51db7f..579f2a99b14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
@@ -19,33 +19,30 @@
 
 package org.apache.iotdb.db.utils.sort;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import org.apache.tsfile.common.conf.TSFileDescriptor;
-
 public class SortBufferManager {
-  private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
-      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
-  public static final long SORT_BUFFER_SIZE =
-      IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+  private final int maxTsBlockSizeInBytes;
+  private final long sortBufferSize;
 
   private long bufferUsed;
 
-  private static final long BUFFER_SIZE_FOR_ONE_BRANCH = 
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  private final long bufferSizeForOneBranch;
 
   private final long bufferAvailableForAllBranch;
   private long readerBuffer = 0;
   private long branchNum = 0;
 
-  public SortBufferManager() {
-    this.bufferAvailableForAllBranch = SORT_BUFFER_SIZE - 
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  public SortBufferManager(int maxTsBlockSizeInBytes, long sortBufferSize) {
+    this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+    this.sortBufferSize = sortBufferSize;
+    this.bufferAvailableForAllBranch = sortBufferSize - maxTsBlockSizeInBytes;
+    this.bufferSizeForOneBranch = maxTsBlockSizeInBytes;
     // the initial value is the buffer for output.
-    this.bufferUsed = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    this.bufferUsed = maxTsBlockSizeInBytes;
   }
 
   public void allocateOneSortBranch() {
-    boolean success = allocate(BUFFER_SIZE_FOR_ONE_BRANCH);
+    boolean success = allocate(bufferSizeForOneBranch);
     if (!success) {
       throw new IllegalArgumentException("Not enough memory for sorting");
     }
@@ -53,7 +50,7 @@ public class SortBufferManager {
   }
 
   private boolean check(long size) {
-    return bufferUsed + size < SORT_BUFFER_SIZE;
+    return bufferUsed + size < sortBufferSize;
   }
 
   public boolean allocate(long size) {
@@ -78,4 +75,12 @@ public class SortBufferManager {
     readerBuffer = bufferAvailableForAllBranch / branchNum;
     return readerBuffer;
   }
+
+  public int getMaxTsBlockSizeInBytes() {
+    return maxTsBlockSizeInBytes;
+  }
+
+  public long getSortBufferSize() {
+    return sortBufferSize;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
index da098b19801..99acfd45366 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -79,6 +80,9 @@ public class SortOperatorTest {
   private final List<TsFileResource> seqResources = new ArrayList<>();
   private final List<TsFileResource> unSeqResources = new ArrayList<>();
 
+  private final String sortTmpPrefixPath =
+      "target" + File.separator + "sort" + File.separator + "tmp";
+
   private int dataNodeId;
 
   private int maxTsBlockSizeInBytes;
@@ -96,6 +100,7 @@ public class SortOperatorTest {
   @After
   public void tearDown() throws IOException {
     SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    cleanDir(sortTmpPrefixPath);
     IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
     
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
   }
@@ -197,7 +202,7 @@ public class SortOperatorTest {
 
     OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(3);
     String filePrefix =
-        "target"
+        sortTmpPrefixPath
             + File.separator
             + operatorContext
                 .getDriverContext()
@@ -228,10 +233,9 @@ public class SortOperatorTest {
   // with data spilling
   @Test
   public void sortOperatorSpillingTest() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
     long sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
-    try {
-      IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
-      SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, true);
+    try (SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, 
true)) {
       int lastValue = -1;
       int count = 0;
       while (root.isBlocked().isDone() && root.hasNext()) {
@@ -248,7 +252,6 @@ public class SortOperatorTest {
           count++;
         }
       }
-      root.close();
       assertEquals(500, count);
     } finally {
       
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
@@ -258,24 +261,24 @@ public class SortOperatorTest {
   // no data spilling
   @Test
   public void sortOperatorNormalTest() throws Exception {
-    Operator root = genSortOperator(Ordering.ASC, true);
-    int lastValue = -1;
-    int count = 0;
-    while (root.isBlocked().isDone() && root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock == null) continue;
-      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-        long time = tsBlock.getTimeByIndex(i);
-        int v1 = tsBlock.getColumn(0).getInt(i);
-        int v2 = tsBlock.getColumn(1).getInt(i);
-        assertTrue(lastValue == -1 || lastValue < v1);
-        assertEquals(getValue(time), v1);
-        assertEquals(v1, v2);
-        lastValue = v1;
-        count++;
+    try (Operator root = genSortOperator(Ordering.ASC, true)) {
+      int lastValue = -1;
+      int count = 0;
+      while (root.isBlocked().isDone() && root.hasNext()) {
+        TsBlock tsBlock = root.next();
+        if (tsBlock == null) continue;
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long time = tsBlock.getTimeByIndex(i);
+          int v1 = tsBlock.getColumn(0).getInt(i);
+          int v2 = tsBlock.getColumn(1).getInt(i);
+          assertTrue(lastValue == -1 || lastValue < v1);
+          assertEquals(getValue(time), v1);
+          assertEquals(v1, v2);
+          lastValue = v1;
+          count++;
+        }
       }
+      assertEquals(500, count);
     }
-    root.close();
-    assertEquals(500, count);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
new file mode 100644
index 00000000000..f273a183604
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class StreamSortOperatorTest {
+
+  private static final String sortTmpPrefixPath =
+      "target" + File.separator + "sort" + File.separator + "tmp";
+
+  private static final ExecutorService instanceNotificationExecutor =
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"sortOperator-test-instance-notification");
+
+  private final long[] timeArray =
+      new long[] {
+        3L, 4L, 2L, 1L, 3L, 1L, 2L, 4L, 5L, 5L, 2L, 3L, 1L, 4L, 2L, 3L, 1L, 
4L, 5L, 1L, 2L, 3L, 4L,
+        5L, 4L, 1L, 2L, 5L, 3L, 5L, 4L, 3L, 2L, 1L
+      };
+  private final String[] column1Array =
+      new String[] {
+        null,
+        null,
+        null,
+        null,
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou"
+      };
+  private final boolean[] column1IsNull =
+      new boolean[] {
+        true, true, true, true, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false
+      };
+  private final String[] column2Array =
+      new String[] {
+        "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d2", "d2", 
"d2", "d2", "d2", "d1",
+        "d1", "d1", "d1", "d1", "d2", "d2", "d2", "d2", "d2", "d1", "d1", 
"d1", "d1", "d1", "d2",
+        "d2", "d2", "d2", "d2"
+      };
+  private final boolean[] column2IsNull =
+      new boolean[] {
+        false, false, false, false, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false
+      };
+  private final int[] column3Array =
+      new int[] {
+        6, 7, 8, 9, 0, 111, 112, 114, 115, 0, 121, 122, 123, 124, 0, 11, 12, 
14, 15, 21, 22, 23, 24,
+        25, 0, 11, 12, 13, 15, 21, 22, 23, 24, 25
+      };
+  private final boolean[] column3IsNull =
+      new boolean[] {
+        false, false, false, false, true, false, false, false, false, true, 
false, false, false,
+        false, true, false, false, false, false, false, false, false, false, 
false, true, false,
+        false, false, false, false, false, false, false, false
+      };
+
+  @After
+  public void cleanUp() throws IOException {
+    cleanDir(sortTmpPrefixPath);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void allInMemoryTest() {
+
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void allInMemoryTes2() {
+    int maxTsBlockLineNumber = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          assertEquals(2, tsBlock.getPositionCount());
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+    }
+  }
+
+  @Test
+  public void someInDiskTest() {
+
+    long sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+    int maxTsBlockSizeInBytes =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+    }
+  }
+
+  @Test
+  public void someInDiskTest2() {
+    long sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+    int maxTsBlockSizeInBytes =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    int maxTsBlockLineNumber = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          assertEquals(2, tsBlock.getPositionCount());
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+    }
+  }
+
+  private StreamSortOperator genStreamSortOperator(int maxLinesToOutput) {
+    // child output
+    // Time, city,       deviceId,   s1
+    // 1     null           d1       9
+    // 2     null           d1       8
+    // ---------------------- TsBlock-1
+    // 3     null           d1       6
+    // 4     null           d1       7
+    // 1     beijing        d1       111
+    // 2     beijing        d1       112
+    // 3     beijing        d1       null
+    // 4     beijing        d1       114
+    // 5     beijing        d1       115
+    // ---------------------- TsBlock-2
+    // 1     beijing        d2       123
+    // 2     beijing        d2       121
+    // 3     beijing        d2       122
+    // 4     beijing        d2       124
+    // 5     beijing        d2       null
+    // ---------------------- TsBlock-3
+    // null
+    // ---------------------- TsBlock-4
+    // 1     shanghai       d1       12
+    // 2     shanghai       d1       null
+    // 3     shanghai       d1       11
+    // 4     shanghai       d1       14
+    // 5     shanghai       d1       15
+    // ---------------------- TsBlock-5
+    // 1     shanghai       d2       21
+    // 2     shanghai       d2       22
+    // 3     shanghai       d2       23
+    // 4     shanghai       d2       24
+    // 5     shanghai       d2       25
+    // 1     yangzhou       d1       11
+    // 2     yangzhou       d1       12
+    // ---------------------- TsBlock-6
+    // 3     yangzhou       d1       15
+    // 4     yangzhou       d1       null
+    // 5     yangzhou       d1       13
+    // ---------------------- TsBlock-7
+    // empty
+    // ---------------------- TsBlock-8
+    // 1     yangzhou       d2       25
+    // 2     yangzhou       d2       24
+    // 3     yangzhou       d2       23
+    // 4     yangzhou       d2       22
+    // ---------------------- TsBlock-9
+    // 5     yangzhou       d2       21
+    // ---------------------- TsBlock-10
+
+    // Construct operator tree
+    QueryId queryId = new QueryId("stub_query");
+
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId1, 
TableScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    driverContext.addOperatorContext(2, planNodeId2, 
StreamSortOperator.class.getSimpleName());
+    Operator childOperator =
+        new Operator() {
+
+          private final long[][] timeArray =
+              new long[][] {
+                {1L, 2L},
+                {3L, 4L, 1L, 2L, 3L, 4L, 5L},
+                {1L, 2L, 3L, 4L, 5L},
+                null,
+                {1L, 2L, 3L, 4L, 5L},
+                {1L, 2L, 3L, 4L, 5L, 1L, 2L},
+                {3L, 4L, 5L},
+                {},
+                {1L, 2L, 3L, 4L},
+                {5L}
+              };
+
+          private final String[][] cityArray =
+              new String[][] {
+                {null, null},
+                {null, null, "beijing", "beijing", "beijing", "beijing", 
"beijing"},
+                {"beijing", "beijing", "beijing", "beijing", "beijing"},
+                null,
+                {"shanghai", "shanghai", "shanghai", "shanghai", "shanghai"},
+                {
+                  "shanghai", "shanghai", "shanghai", "shanghai", "shanghai", 
"yangzhou", "yangzhou"
+                },
+                {"yangzhou", "yangzhou", "yangzhou"},
+                {},
+                {"yangzhou", "yangzhou", "yangzhou", "yangzhou"},
+                {"yangzhou"}
+              };
+
+          private final String[][] deviceIdArray =
+              new String[][] {
+                {"d1", "d1"},
+                {"d1", "d1", "d1", "d1", "d1", "d1", "d1"},
+                {"d2", "d2", "d2", "d2", "d2"},
+                null,
+                {"d1", "d1", "d1", "d1", "d1"},
+                {"d2", "d2", "d2", "d2", "d2", "d1", "d1"},
+                {"d1", "d1", "d1"},
+                {},
+                {"d2", "d2", "d2", "d2"},
+                {"d2"}
+              };
+
+          private final int[][] valueArray =
+              new int[][] {
+                {9, 8},
+                {6, 7, 111, 112, 0, 114, 115},
+                {123, 121, 122, 124, 0},
+                null,
+                {12, 0, 11, 14, 15},
+                {21, 22, 23, 24, 25, 11, 12},
+                {15, 0, 13},
+                {},
+                {25, 24, 23, 22},
+                {21}
+              };
+
+          private final boolean[][] valueIsNull =
+              new boolean[][] {
+                {false, false},
+                {false, false, false, false, true, false, false},
+                {false, false, false, false, true},
+                null,
+                {false, true, false, false, false},
+                {false, false, false, false, false, false, false},
+                {false, true, false},
+                {},
+                {false, false, false, false},
+                {false}
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return driverContext.getOperatorContexts().get(0);
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(
+                    timeArray[index].length,
+                    Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.INT32));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (cityArray[index][i] == null) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder
+                    .getColumnBuilder(0)
+                    .writeBinary(new Binary(cityArray[index][i], 
TSFileConfig.STRING_CHARSET));
+              }
+              if (deviceIdArray[index][i] == null) {
+                builder.getColumnBuilder(1).appendNull();
+              } else {
+                builder
+                    .getColumnBuilder(1)
+                    .writeBinary(new Binary(deviceIdArray[index][i], 
TSFileConfig.STRING_CHARSET));
+              }
+              if (valueIsNull[index][i]) {
+                builder.getColumnBuilder(2).appendNull();
+              } else {
+                builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() throws Exception {
+            return index < valueIsNull.length;
+          }
+
+          @Override
+          public void close() throws Exception {}
+
+          @Override
+          public boolean isFinished() throws Exception {
+            return index >= valueIsNull.length;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+
+          @Override
+          public long ramBytesUsed() {
+            return 0;
+          }
+        };
+
+    OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(1);
+    String filePrefix =
+        sortTmpPrefixPath
+            + File.separator
+            + operatorContext
+                .getDriverContext()
+                .getFragmentInstanceContext()
+                .getId()
+                .getFragmentInstanceId()
+            + File.separator
+            + operatorContext.getDriverContext().getPipelineId()
+            + File.separator;
+
+    List<SortOrder> sortOrderList =
+        Arrays.asList(
+            SortOrder.ASC_NULLS_FIRST, SortOrder.ASC_NULLS_FIRST, 
SortOrder.ASC_NULLS_FIRST);
+    List<Integer> sortItemIndexList = Arrays.asList(0, 1, 2);
+    List<TSDataType> sortItemDataTypeList =
+        Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32);
+
+    Comparator<SortKey> comparator =
+        getComparatorForTable(sortOrderList, sortItemIndexList, 
sortItemDataTypeList);
+    ;
+
+    Comparator<SortKey> streamKeyComparator =
+        getComparatorForTable(
+            sortOrderList.subList(0, 2),
+            sortItemIndexList.subList(0, 2),
+            sortItemDataTypeList.subList(0, 2));
+
+    return new StreamSortOperator(
+        operatorContext,
+        childOperator,
+        Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32),
+        filePrefix,
+        comparator,
+        streamKeyComparator,
+        maxLinesToOutput);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
index b164677ae6c..98d6f765d4c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.sort;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
 
@@ -94,7 +95,10 @@ public class SortUtilTest {
       sortKeyList.add(new SortKey(tsBlock, i));
     }
 
-    SortBufferManager sortBufferManager = new SortBufferManager();
+    SortBufferManager sortBufferManager =
+        new SortBufferManager(
+            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+            IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
     try {
       sortBufferManager.allocateOneSortBranch();
       diskSpiller.spillSortedData(sortKeyList);

Reply via email to