This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TableScanEachLimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableScanEachLimit by this 
push:
     new 22eeff0d7da Add support for StreamSortOperator
22eeff0d7da is described below

commit 22eeff0d7daf88c51611de6654fc202348447f98
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Jun 7 10:41:06 2024 +0800

    Add support for StreamSortOperator
---
 .../operator/process/AbstractSortOperator.java     |  27 +-
 .../apache/iotdb/db/utils/sort/DiskSpiller.java    |  13 +-
 .../iotdb/db/utils/sort/SortBufferManager.java     |  33 +-
 .../execution/operator/SortOperatorTest.java       |  47 +-
 .../operator/process/StreamSortOperatorTest.java   | 591 +++++++++++++++++++++
 .../apache/iotdb/db/utils/sort/SortUtilTest.java   |   6 +-
 6 files changed, 666 insertions(+), 51 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index bb8c7a16c2c..ca152e6d77b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.execution.operator.process;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
@@ -46,8 +47,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
-
 public abstract class AbstractSortOperator implements ProcessOperator {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSortOperator.class);
@@ -94,7 +93,10 @@ public abstract class AbstractSortOperator implements 
ProcessOperator {
     this.cachedBytes = 0;
     this.diskSpiller =
         new DiskSpiller(folderPath, folderPath + 
operatorContext.getOperatorId(), dataTypes);
-    this.sortBufferManager = new SortBufferManager();
+    this.sortBufferManager =
+        new SortBufferManager(
+            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+            IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
   }
 
   protected void buildResult() throws IoTDBException {
@@ -162,7 +164,7 @@ public abstract class AbstractSortOperator implements 
ProcessOperator {
 
   protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
     long bytesSize = tsBlock.getRetainedSizeInBytes();
-    if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
+    if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) {
       cachedBytes += bytesSize;
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
         cachedData.add(new MergeSortKey(tsBlock, i));
@@ -312,7 +314,9 @@ public abstract class AbstractSortOperator implements 
ProcessOperator {
   }
 
   protected boolean hasMoreSortedData() {
-    return (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+    return (!diskSpiller.hasSpilledData()
+            && ((curRow == -1 && !cachedData.isEmpty())
+                || (curRow != -1 && curRow != cachedData.size())))
         || (diskSpiller.hasSpilledData() && hasMoreData());
   }
 
@@ -333,7 +337,7 @@ public abstract class AbstractSortOperator implements 
ProcessOperator {
   public long calculateMaxPeekMemory() {
     return inputOperator.calculateMaxPeekMemoryWithCounter()
         + inputOperator.calculateRetainedSizeAfterCallingNext()
-        + SORT_BUFFER_SIZE;
+        + sortBufferManager.getSortBufferSize();
   }
 
   @Override
@@ -343,16 +347,19 @@ public abstract class AbstractSortOperator implements 
ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
+    return inputOperator.calculateRetainedSizeAfterCallingNext()
+        + sortBufferManager.getSortBufferSize();
   }
 
   protected void resetSortRelatedResource() {
     curRow = -1;
-    cachedData.clear();
+    cachedData = new ArrayList<>();
     cachedBytes = 0;
     clear();
-    sortBufferManager = new SortBufferManager();
-    if (!mergeSortHeap.isEmpty()) {
+    sortBufferManager =
+        new SortBufferManager(
+            sortBufferManager.getMaxTsBlockSizeInBytes(), 
sortBufferManager.getSortBufferSize());
+    if (mergeSortHeap != null && !mergeSortHeap.isEmpty()) {
       throw new IllegalStateException("mergeSortHeap should be empty!");
     }
     mergeSortHeap = null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
index 9a52f7464f1..b0d82725346 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
@@ -104,11 +104,16 @@ public class DiskSpiller {
     }
   }
 
-  private void writeData(List<TsBlock> sortedData, String fileName)
-      throws IOException, IoTDBException {
+  private void writeData(List<TsBlock> sortedData, String fileName) throws 
IoTDBException {
     Path filePath = Paths.get(fileName);
-    Files.createFile(filePath);
-    try (FileChannel fileChannel = FileChannel.open(filePath, 
StandardOpenOption.WRITE)) {
+    // for stream sort we may reuse the previous tmp file name, so we need 
TRUNCATE_EXISTING and
+    // CREATE
+    try (FileChannel fileChannel =
+        FileChannel.open(
+            filePath,
+            StandardOpenOption.WRITE,
+            StandardOpenOption.TRUNCATE_EXISTING,
+            StandardOpenOption.CREATE)) {
       for (TsBlock tsBlock : sortedData) {
         ByteBuffer tsBlockBuffer = serde.serialize(tsBlock);
         ByteBuffer length = ByteBuffer.allocate(4);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
index 8f2de51db7f..579f2a99b14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
@@ -19,33 +19,30 @@
 
 package org.apache.iotdb.db.utils.sort;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import org.apache.tsfile.common.conf.TSFileDescriptor;
-
 public class SortBufferManager {
-  private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
-      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
-  public static final long SORT_BUFFER_SIZE =
-      IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+  private final int maxTsBlockSizeInBytes;
+  private final long sortBufferSize;
 
   private long bufferUsed;
 
-  private static final long BUFFER_SIZE_FOR_ONE_BRANCH = 
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  private final long bufferSizeForOneBranch;
 
   private final long bufferAvailableForAllBranch;
   private long readerBuffer = 0;
   private long branchNum = 0;
 
-  public SortBufferManager() {
-    this.bufferAvailableForAllBranch = SORT_BUFFER_SIZE - 
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  public SortBufferManager(int maxTsBlockSizeInBytes, long sortBufferSize) {
+    this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+    this.sortBufferSize = sortBufferSize;
+    this.bufferAvailableForAllBranch = sortBufferSize - maxTsBlockSizeInBytes;
+    this.bufferSizeForOneBranch = maxTsBlockSizeInBytes;
     // the initial value is the buffer for output.
-    this.bufferUsed = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    this.bufferUsed = maxTsBlockSizeInBytes;
   }
 
   public void allocateOneSortBranch() {
-    boolean success = allocate(BUFFER_SIZE_FOR_ONE_BRANCH);
+    boolean success = allocate(bufferSizeForOneBranch);
     if (!success) {
       throw new IllegalArgumentException("Not enough memory for sorting");
     }
@@ -53,7 +50,7 @@ public class SortBufferManager {
   }
 
   private boolean check(long size) {
-    return bufferUsed + size < SORT_BUFFER_SIZE;
+    return bufferUsed + size < sortBufferSize;
   }
 
   public boolean allocate(long size) {
@@ -78,4 +75,12 @@ public class SortBufferManager {
     readerBuffer = bufferAvailableForAllBranch / branchNum;
     return readerBuffer;
   }
+
+  public int getMaxTsBlockSizeInBytes() {
+    return maxTsBlockSizeInBytes;
+  }
+
+  public long getSortBufferSize() {
+    return sortBufferSize;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
index da098b19801..99acfd45366 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -79,6 +80,9 @@ public class SortOperatorTest {
   private final List<TsFileResource> seqResources = new ArrayList<>();
   private final List<TsFileResource> unSeqResources = new ArrayList<>();
 
+  private final String sortTmpPrefixPath =
+      "target" + File.separator + "sort" + File.separator + "tmp";
+
   private int dataNodeId;
 
   private int maxTsBlockSizeInBytes;
@@ -96,6 +100,7 @@ public class SortOperatorTest {
   @After
   public void tearDown() throws IOException {
     SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    cleanDir(sortTmpPrefixPath);
     IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
     
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
   }
@@ -197,7 +202,7 @@ public class SortOperatorTest {
 
     OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(3);
     String filePrefix =
-        "target"
+        sortTmpPrefixPath
             + File.separator
             + operatorContext
                 .getDriverContext()
@@ -228,10 +233,9 @@ public class SortOperatorTest {
   // with data spilling
   @Test
   public void sortOperatorSpillingTest() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
     long sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
-    try {
-      IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
-      SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, true);
+    try (SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, 
true)) {
       int lastValue = -1;
       int count = 0;
       while (root.isBlocked().isDone() && root.hasNext()) {
@@ -248,7 +252,6 @@ public class SortOperatorTest {
           count++;
         }
       }
-      root.close();
       assertEquals(500, count);
     } finally {
       
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
@@ -258,24 +261,24 @@ public class SortOperatorTest {
   // no data spilling
   @Test
   public void sortOperatorNormalTest() throws Exception {
-    Operator root = genSortOperator(Ordering.ASC, true);
-    int lastValue = -1;
-    int count = 0;
-    while (root.isBlocked().isDone() && root.hasNext()) {
-      TsBlock tsBlock = root.next();
-      if (tsBlock == null) continue;
-      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-        long time = tsBlock.getTimeByIndex(i);
-        int v1 = tsBlock.getColumn(0).getInt(i);
-        int v2 = tsBlock.getColumn(1).getInt(i);
-        assertTrue(lastValue == -1 || lastValue < v1);
-        assertEquals(getValue(time), v1);
-        assertEquals(v1, v2);
-        lastValue = v1;
-        count++;
+    try (Operator root = genSortOperator(Ordering.ASC, true)) {
+      int lastValue = -1;
+      int count = 0;
+      while (root.isBlocked().isDone() && root.hasNext()) {
+        TsBlock tsBlock = root.next();
+        if (tsBlock == null) continue;
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long time = tsBlock.getTimeByIndex(i);
+          int v1 = tsBlock.getColumn(0).getInt(i);
+          int v2 = tsBlock.getColumn(1).getInt(i);
+          assertTrue(lastValue == -1 || lastValue < v1);
+          assertEquals(getValue(time), v1);
+          assertEquals(v1, v2);
+          lastValue = v1;
+          count++;
+        }
       }
+      assertEquals(500, count);
     }
-    root.close();
-    assertEquals(500, count);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
new file mode 100644
index 00000000000..f273a183604
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class StreamSortOperatorTest {
+
+  private static final String sortTmpPrefixPath =
+      "target" + File.separator + "sort" + File.separator + "tmp";
+
+  private static final ExecutorService instanceNotificationExecutor =
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"sortOperator-test-instance-notification");
+
+  private final long[] timeArray =
+      new long[] {
+        3L, 4L, 2L, 1L, 3L, 1L, 2L, 4L, 5L, 5L, 2L, 3L, 1L, 4L, 2L, 3L, 1L, 
4L, 5L, 1L, 2L, 3L, 4L,
+        5L, 4L, 1L, 2L, 5L, 3L, 5L, 4L, 3L, 2L, 1L
+      };
+  private final String[] column1Array =
+      new String[] {
+        null,
+        null,
+        null,
+        null,
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "beijing",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "shanghai",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou",
+        "yangzhou"
+      };
+  private final boolean[] column1IsNull =
+      new boolean[] {
+        true, true, true, true, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false
+      };
+  private final String[] column2Array =
+      new String[] {
+        "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d2", "d2", 
"d2", "d2", "d2", "d1",
+        "d1", "d1", "d1", "d1", "d2", "d2", "d2", "d2", "d2", "d1", "d1", 
"d1", "d1", "d1", "d2",
+        "d2", "d2", "d2", "d2"
+      };
+  private final boolean[] column2IsNull =
+      new boolean[] {
+        false, false, false, false, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false, false, false, 
false, false, false,
+        false, false, false, false, false, false, false, false
+      };
+  private final int[] column3Array =
+      new int[] {
+        6, 7, 8, 9, 0, 111, 112, 114, 115, 0, 121, 122, 123, 124, 0, 11, 12, 
14, 15, 21, 22, 23, 24,
+        25, 0, 11, 12, 13, 15, 21, 22, 23, 24, 25
+      };
+  private final boolean[] column3IsNull =
+      new boolean[] {
+        false, false, false, false, true, false, false, false, false, true, 
false, false, false,
+        false, true, false, false, false, false, false, false, false, false, 
false, true, false,
+        false, false, false, false, false, false, false, false
+      };
+
+  @After
+  public void cleanUp() throws IOException {
+    cleanDir(sortTmpPrefixPath);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void allInMemoryTest() {
+
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void allInMemoryTes2() {
+    int maxTsBlockLineNumber = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          assertEquals(2, tsBlock.getPositionCount());
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+    }
+  }
+
+  @Test
+  public void someInDiskTest() {
+
+    long sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+    int maxTsBlockSizeInBytes =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+    }
+  }
+
+  @Test
+  public void someInDiskTest2() {
+    long sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+    int maxTsBlockSizeInBytes =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    int maxTsBlockLineNumber = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+    try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+
+      int count = 0;
+      ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+      listenableFuture.get();
+      while (!streamSortOperator.isFinished() && streamSortOperator.hasNext()) 
{
+        TsBlock tsBlock = streamSortOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          assertEquals(2, tsBlock.getPositionCount());
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(
+                  column1Array[count],
+                  
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(
+                  column2Array[count],
+                  
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        listenableFuture = streamSortOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+    }
+  }
+
+  private StreamSortOperator genStreamSortOperator(int maxLinesToOutput) {
+    // child output
+    // Time, city,       deviceId,   s1
+    // 1     null           d1       9
+    // 2     null           d1       8
+    // ---------------------- TsBlock-1
+    // 3     null           d1       6
+    // 4     null           d1       7
+    // 1     beijing        d1       111
+    // 2     beijing        d1       112
+    // 3     beijing        d1       null
+    // 4     beijing        d1       114
+    // 5     beijing        d1       115
+    // ---------------------- TsBlock-2
+    // 1     beijing        d2       123
+    // 2     beijing        d2       121
+    // 3     beijing        d2       122
+    // 4     beijing        d2       124
+    // 5     beijing        d2       null
+    // ---------------------- TsBlock-3
+    // null
+    // ---------------------- TsBlock-4
+    // 1     shanghai       d1       12
+    // 2     shanghai       d1       null
+    // 3     shanghai       d1       11
+    // 4     shanghai       d1       14
+    // 5     shanghai       d1       15
+    // ---------------------- TsBlock-5
+    // 1     shanghai       d2       21
+    // 2     shanghai       d2       22
+    // 3     shanghai       d2       23
+    // 4     shanghai       d2       24
+    // 5     shanghai       d2       25
+    // 1     yangzhou       d1       11
+    // 2     yangzhou       d1       12
+    // ---------------------- TsBlock-6
+    // 3     yangzhou       d1       15
+    // 4     yangzhou       d1       null
+    // 5     yangzhou       d1       13
+    // ---------------------- TsBlock-7
+    // empty
+    // ---------------------- TsBlock-8
+    // 1     yangzhou       d2       25
+    // 2     yangzhou       d2       24
+    // 3     yangzhou       d2       23
+    // 4     yangzhou       d2       22
+    // ---------------------- TsBlock-9
+    // 5     yangzhou       d2       21
+    // ---------------------- TsBlock-10
+
+    // Construct operator tree
+    QueryId queryId = new QueryId("stub_query");
+
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId1, 
TableScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    driverContext.addOperatorContext(2, planNodeId2, 
StreamSortOperator.class.getSimpleName());
+    Operator childOperator =
+        new Operator() {
+
+          private final long[][] timeArray =
+              new long[][] {
+                {1L, 2L},
+                {3L, 4L, 1L, 2L, 3L, 4L, 5L},
+                {1L, 2L, 3L, 4L, 5L},
+                null,
+                {1L, 2L, 3L, 4L, 5L},
+                {1L, 2L, 3L, 4L, 5L, 1L, 2L},
+                {3L, 4L, 5L},
+                {},
+                {1L, 2L, 3L, 4L},
+                {5L}
+              };
+
+          private final String[][] cityArray =
+              new String[][] {
+                {null, null},
+                {null, null, "beijing", "beijing", "beijing", "beijing", 
"beijing"},
+                {"beijing", "beijing", "beijing", "beijing", "beijing"},
+                null,
+                {"shanghai", "shanghai", "shanghai", "shanghai", "shanghai"},
+                {
+                  "shanghai", "shanghai", "shanghai", "shanghai", "shanghai", 
"yangzhou", "yangzhou"
+                },
+                {"yangzhou", "yangzhou", "yangzhou"},
+                {},
+                {"yangzhou", "yangzhou", "yangzhou", "yangzhou"},
+                {"yangzhou"}
+              };
+
+          private final String[][] deviceIdArray =
+              new String[][] {
+                {"d1", "d1"},
+                {"d1", "d1", "d1", "d1", "d1", "d1", "d1"},
+                {"d2", "d2", "d2", "d2", "d2"},
+                null,
+                {"d1", "d1", "d1", "d1", "d1"},
+                {"d2", "d2", "d2", "d2", "d2", "d1", "d1"},
+                {"d1", "d1", "d1"},
+                {},
+                {"d2", "d2", "d2", "d2"},
+                {"d2"}
+              };
+
+          private final int[][] valueArray =
+              new int[][] {
+                {9, 8},
+                {6, 7, 111, 112, 0, 114, 115},
+                {123, 121, 122, 124, 0},
+                null,
+                {12, 0, 11, 14, 15},
+                {21, 22, 23, 24, 25, 11, 12},
+                {15, 0, 13},
+                {},
+                {25, 24, 23, 22},
+                {21}
+              };
+
+          private final boolean[][] valueIsNull =
+              new boolean[][] {
+                {false, false},
+                {false, false, false, false, true, false, false},
+                {false, false, false, false, true},
+                null,
+                {false, true, false, false, false},
+                {false, false, false, false, false, false, false},
+                {false, true, false},
+                {},
+                {false, false, false, false},
+                {false}
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return driverContext.getOperatorContexts().get(0);
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(
+                    timeArray[index].length,
+                    Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.INT32));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (cityArray[index][i] == null) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder
+                    .getColumnBuilder(0)
+                    .writeBinary(new Binary(cityArray[index][i], 
TSFileConfig.STRING_CHARSET));
+              }
+              if (deviceIdArray[index][i] == null) {
+                builder.getColumnBuilder(1).appendNull();
+              } else {
+                builder
+                    .getColumnBuilder(1)
+                    .writeBinary(new Binary(deviceIdArray[index][i], 
TSFileConfig.STRING_CHARSET));
+              }
+              if (valueIsNull[index][i]) {
+                builder.getColumnBuilder(2).appendNull();
+              } else {
+                builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() throws Exception {
+            return index < valueIsNull.length;
+          }
+
+          @Override
+          public void close() throws Exception {}
+
+          @Override
+          public boolean isFinished() throws Exception {
+            return index >= valueIsNull.length;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+
+          @Override
+          public long ramBytesUsed() {
+            return 0;
+          }
+        };
+
+    OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(1);
+    String filePrefix =
+        sortTmpPrefixPath
+            + File.separator
+            + operatorContext
+                .getDriverContext()
+                .getFragmentInstanceContext()
+                .getId()
+                .getFragmentInstanceId()
+            + File.separator
+            + operatorContext.getDriverContext().getPipelineId()
+            + File.separator;
+
+    List<SortOrder> sortOrderList =
+        Arrays.asList(
+            SortOrder.ASC_NULLS_FIRST, SortOrder.ASC_NULLS_FIRST, 
SortOrder.ASC_NULLS_FIRST);
+    List<Integer> sortItemIndexList = Arrays.asList(0, 1, 2);
+    List<TSDataType> sortItemDataTypeList =
+        Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32);
+
+    Comparator<SortKey> comparator =
+        getComparatorForTable(sortOrderList, sortItemIndexList, 
sortItemDataTypeList);
+    ;
+
+    Comparator<SortKey> streamKeyComparator =
+        getComparatorForTable(
+            sortOrderList.subList(0, 2),
+            sortItemIndexList.subList(0, 2),
+            sortItemDataTypeList.subList(0, 2));
+
+    return new StreamSortOperator(
+        operatorContext,
+        childOperator,
+        Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32),
+        filePrefix,
+        comparator,
+        streamKeyComparator,
+        maxLinesToOutput);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
index b164677ae6c..98d6f765d4c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.sort;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
 
@@ -94,7 +95,10 @@ public class SortUtilTest {
       sortKeyList.add(new SortKey(tsBlock, i));
     }
 
-    SortBufferManager sortBufferManager = new SortBufferManager();
+    SortBufferManager sortBufferManager =
+        new SortBufferManager(
+            
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+            IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
     try {
       sortBufferManager.allocateOneSortBranch();
       diskSpiller.spillSortedData(sortKeyList);


Reply via email to