This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TableScanEachLimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableScanEachLimit by this
push:
new 22eeff0d7da Add support for StreamSortOperator
22eeff0d7da is described below
commit 22eeff0d7daf88c51611de6654fc202348447f98
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Jun 7 10:41:06 2024 +0800
Add support for StreamSortOperator
---
.../operator/process/AbstractSortOperator.java | 27 +-
.../apache/iotdb/db/utils/sort/DiskSpiller.java | 13 +-
.../iotdb/db/utils/sort/SortBufferManager.java | 33 +-
.../execution/operator/SortOperatorTest.java | 47 +-
.../operator/process/StreamSortOperatorTest.java | 591 +++++++++++++++++++++
.../apache/iotdb/db/utils/sort/SortUtilTest.java | 6 +-
6 files changed, 666 insertions(+), 51 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index bb8c7a16c2c..ca152e6d77b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
@@ -46,8 +47,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
-
public abstract class AbstractSortOperator implements ProcessOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSortOperator.class);
@@ -94,7 +93,10 @@ public abstract class AbstractSortOperator implements
ProcessOperator {
this.cachedBytes = 0;
this.diskSpiller =
new DiskSpiller(folderPath, folderPath +
operatorContext.getOperatorId(), dataTypes);
- this.sortBufferManager = new SortBufferManager();
+ this.sortBufferManager =
+ new SortBufferManager(
+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
}
protected void buildResult() throws IoTDBException {
@@ -162,7 +164,7 @@ public abstract class AbstractSortOperator implements
ProcessOperator {
protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
long bytesSize = tsBlock.getRetainedSizeInBytes();
- if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
+ if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) {
cachedBytes += bytesSize;
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
cachedData.add(new MergeSortKey(tsBlock, i));
@@ -312,7 +314,9 @@ public abstract class AbstractSortOperator implements
ProcessOperator {
}
protected boolean hasMoreSortedData() {
- return (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+ return (!diskSpiller.hasSpilledData()
+ && ((curRow == -1 && !cachedData.isEmpty())
+ || (curRow != -1 && curRow != cachedData.size())))
|| (diskSpiller.hasSpilledData() && hasMoreData());
}
@@ -333,7 +337,7 @@ public abstract class AbstractSortOperator implements
ProcessOperator {
public long calculateMaxPeekMemory() {
return inputOperator.calculateMaxPeekMemoryWithCounter()
+ inputOperator.calculateRetainedSizeAfterCallingNext()
- + SORT_BUFFER_SIZE;
+ + sortBufferManager.getSortBufferSize();
}
@Override
@@ -343,16 +347,19 @@ public abstract class AbstractSortOperator implements
ProcessOperator {
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return inputOperator.calculateRetainedSizeAfterCallingNext() +
SORT_BUFFER_SIZE;
+ return inputOperator.calculateRetainedSizeAfterCallingNext()
+ + sortBufferManager.getSortBufferSize();
}
protected void resetSortRelatedResource() {
curRow = -1;
- cachedData.clear();
+ cachedData = new ArrayList<>();
cachedBytes = 0;
clear();
- sortBufferManager = new SortBufferManager();
- if (!mergeSortHeap.isEmpty()) {
+ sortBufferManager =
+ new SortBufferManager(
+ sortBufferManager.getMaxTsBlockSizeInBytes(),
sortBufferManager.getSortBufferSize());
+ if (mergeSortHeap != null && !mergeSortHeap.isEmpty()) {
throw new IllegalStateException("mergeSortHeap should be empty!");
}
mergeSortHeap = null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
index 9a52f7464f1..b0d82725346 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
@@ -104,11 +104,16 @@ public class DiskSpiller {
}
}
- private void writeData(List<TsBlock> sortedData, String fileName)
- throws IOException, IoTDBException {
+ private void writeData(List<TsBlock> sortedData, String fileName) throws
IoTDBException {
Path filePath = Paths.get(fileName);
- Files.createFile(filePath);
- try (FileChannel fileChannel = FileChannel.open(filePath,
StandardOpenOption.WRITE)) {
+ // for stream sort we may reuse the previous tmp file name, so we need
TRUNCATE_EXISTING and
+ // CREATE
+ try (FileChannel fileChannel =
+ FileChannel.open(
+ filePath,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.TRUNCATE_EXISTING,
+ StandardOpenOption.CREATE)) {
for (TsBlock tsBlock : sortedData) {
ByteBuffer tsBlockBuffer = serde.serialize(tsBlock);
ByteBuffer length = ByteBuffer.allocate(4);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
index 8f2de51db7f..579f2a99b14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
@@ -19,33 +19,30 @@
package org.apache.iotdb.db.utils.sort;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import org.apache.tsfile.common.conf.TSFileDescriptor;
-
public class SortBufferManager {
- private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
- TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
- public static final long SORT_BUFFER_SIZE =
- IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+ private final int maxTsBlockSizeInBytes;
+ private final long sortBufferSize;
private long bufferUsed;
- private static final long BUFFER_SIZE_FOR_ONE_BRANCH =
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ private final long bufferSizeForOneBranch;
private final long bufferAvailableForAllBranch;
private long readerBuffer = 0;
private long branchNum = 0;
- public SortBufferManager() {
- this.bufferAvailableForAllBranch = SORT_BUFFER_SIZE -
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ public SortBufferManager(int maxTsBlockSizeInBytes, long sortBufferSize) {
+ this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+ this.sortBufferSize = sortBufferSize;
+ this.bufferAvailableForAllBranch = sortBufferSize - maxTsBlockSizeInBytes;
+ this.bufferSizeForOneBranch = maxTsBlockSizeInBytes;
// the initial value is the buffer for output.
- this.bufferUsed = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ this.bufferUsed = maxTsBlockSizeInBytes;
}
public void allocateOneSortBranch() {
- boolean success = allocate(BUFFER_SIZE_FOR_ONE_BRANCH);
+ boolean success = allocate(bufferSizeForOneBranch);
if (!success) {
throw new IllegalArgumentException("Not enough memory for sorting");
}
@@ -53,7 +50,7 @@ public class SortBufferManager {
}
private boolean check(long size) {
- return bufferUsed + size < SORT_BUFFER_SIZE;
+ return bufferUsed + size < sortBufferSize;
}
public boolean allocate(long size) {
@@ -78,4 +75,12 @@ public class SortBufferManager {
readerBuffer = bufferAvailableForAllBranch / branchNum;
return readerBuffer;
}
+
+ public int getMaxTsBlockSizeInBytes() {
+ return maxTsBlockSizeInBytes;
+ }
+
+ public long getSortBufferSize() {
+ return sortBufferSize;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
index da098b19801..99acfd45366 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -79,6 +80,9 @@ public class SortOperatorTest {
private final List<TsFileResource> seqResources = new ArrayList<>();
private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private final String sortTmpPrefixPath =
+ "target" + File.separator + "sort" + File.separator + "tmp";
+
private int dataNodeId;
private int maxTsBlockSizeInBytes;
@@ -96,6 +100,7 @@ public class SortOperatorTest {
@After
public void tearDown() throws IOException {
SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ cleanDir(sortTmpPrefixPath);
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
}
@@ -197,7 +202,7 @@ public class SortOperatorTest {
OperatorContext operatorContext =
driverContext.getOperatorContexts().get(3);
String filePrefix =
- "target"
+ sortTmpPrefixPath
+ File.separator
+ operatorContext
.getDriverContext()
@@ -228,10 +233,9 @@ public class SortOperatorTest {
// with data spilling
@Test
public void sortOperatorSpillingTest() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
long sortBufferSize =
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
- try {
- IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
- SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, true);
+ try (SortOperator root = (SortOperator) genSortOperator(Ordering.ASC,
true)) {
int lastValue = -1;
int count = 0;
while (root.isBlocked().isDone() && root.hasNext()) {
@@ -248,7 +252,6 @@ public class SortOperatorTest {
count++;
}
}
- root.close();
assertEquals(500, count);
} finally {
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
@@ -258,24 +261,24 @@ public class SortOperatorTest {
// no data spilling
@Test
public void sortOperatorNormalTest() throws Exception {
- Operator root = genSortOperator(Ordering.ASC, true);
- int lastValue = -1;
- int count = 0;
- while (root.isBlocked().isDone() && root.hasNext()) {
- TsBlock tsBlock = root.next();
- if (tsBlock == null) continue;
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- long time = tsBlock.getTimeByIndex(i);
- int v1 = tsBlock.getColumn(0).getInt(i);
- int v2 = tsBlock.getColumn(1).getInt(i);
- assertTrue(lastValue == -1 || lastValue < v1);
- assertEquals(getValue(time), v1);
- assertEquals(v1, v2);
- lastValue = v1;
- count++;
+ try (Operator root = genSortOperator(Ordering.ASC, true)) {
+ int lastValue = -1;
+ int count = 0;
+ while (root.isBlocked().isDone() && root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock == null) continue;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeByIndex(i);
+ int v1 = tsBlock.getColumn(0).getInt(i);
+ int v2 = tsBlock.getColumn(1).getInt(i);
+ assertTrue(lastValue == -1 || lastValue < v1);
+ assertEquals(getValue(time), v1);
+ assertEquals(v1, v2);
+ lastValue = v1;
+ count++;
+ }
}
+ assertEquals(500, count);
}
- root.close();
- assertEquals(500, count);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
new file mode 100644
index 00000000000..f273a183604
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class StreamSortOperatorTest {
+
+ private static final String sortTmpPrefixPath =
+ "target" + File.separator + "sort" + File.separator + "tmp";
+
+ private static final ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"sortOperator-test-instance-notification");
+
+ private final long[] timeArray =
+ new long[] {
+ 3L, 4L, 2L, 1L, 3L, 1L, 2L, 4L, 5L, 5L, 2L, 3L, 1L, 4L, 2L, 3L, 1L,
4L, 5L, 1L, 2L, 3L, 4L,
+ 5L, 4L, 1L, 2L, 5L, 3L, 5L, 4L, 3L, 2L, 1L
+ };
+ private final String[] column1Array =
+ new String[] {
+ null,
+ null,
+ null,
+ null,
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou"
+ };
+ private final boolean[] column1IsNull =
+ new boolean[] {
+ true, true, true, true, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false
+ };
+ private final String[] column2Array =
+ new String[] {
+ "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d2", "d2",
"d2", "d2", "d2", "d1",
+ "d1", "d1", "d1", "d1", "d2", "d2", "d2", "d2", "d2", "d1", "d1",
"d1", "d1", "d1", "d2",
+ "d2", "d2", "d2", "d2"
+ };
+ private final boolean[] column2IsNull =
+ new boolean[] {
+ false, false, false, false, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false
+ };
+ private final int[] column3Array =
+ new int[] {
+ 6, 7, 8, 9, 0, 111, 112, 114, 115, 0, 121, 122, 123, 124, 0, 11, 12,
14, 15, 21, 22, 23, 24,
+ 25, 0, 11, 12, 13, 15, 21, 22, 23, 24, 25
+ };
+ private final boolean[] column3IsNull =
+ new boolean[] {
+ false, false, false, false, true, false, false, false, false, true,
false, false, false,
+ false, true, false, false, false, false, false, false, false, false,
false, true, false,
+ false, false, false, false, false, false, false, false
+ };
+
+ @After
+ public void cleanUp() throws IOException {
+ cleanDir(sortTmpPrefixPath);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ instanceNotificationExecutor.shutdown();
+ }
+
+ @Test
+ public void allInMemoryTest() {
+
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allInMemoryTes2() {
+ int maxTsBlockLineNumber =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ assertEquals(2, tsBlock.getPositionCount());
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+ }
+ }
+
+ @Test
+ public void someInDiskTest() {
+
+ long sortBufferSize =
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+ int maxTsBlockSizeInBytes =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+ }
+ }
+
+ @Test
+ public void someInDiskTest2() {
+ long sortBufferSize =
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+ int maxTsBlockSizeInBytes =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ int maxTsBlockLineNumber =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ assertEquals(2, tsBlock.getPositionCount());
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+ }
+ }
+
+ private StreamSortOperator genStreamSortOperator(int maxLinesToOutput) {
+ // child output
+ // Time, city, deviceId, s1
+ // 1 null d1 9
+ // 2 null d1 8
+ // ---------------------- TsBlock-1
+ // 3 null d1 6
+ // 4 null d1 7
+ // 1 beijing d1 111
+ // 2 beijing d1 112
+ // 3 beijing d1 null
+ // 4 beijing d1 114
+ // 5 beijing d1 115
+ // ---------------------- TsBlock-2
+ // 1 beijing d2 123
+ // 2 beijing d2 121
+ // 3 beijing d2 122
+ // 4 beijing d2 124
+ // 5 beijing d2 null
+ // ---------------------- TsBlock-3
+ // null
+ // ---------------------- TsBlock-4
+ // 1 shanghai d1 12
+ // 2 shanghai d1 null
+ // 3 shanghai d1 11
+ // 4 shanghai d1 14
+ // 5 shanghai d1 15
+ // ---------------------- TsBlock-5
+ // 1 shanghai d2 21
+ // 2 shanghai d2 22
+ // 3 shanghai d2 23
+ // 4 shanghai d2 24
+ // 5 shanghai d2 25
+ // 1 yangzhou d1 11
+ // 2 yangzhou d1 12
+ // ---------------------- TsBlock-6
+ // 3 yangzhou d1 15
+ // 4 yangzhou d1 null
+ // 5 yangzhou d1 13
+ // ---------------------- TsBlock-7
+ // empty
+ // ---------------------- TsBlock-8
+ // 1 yangzhou d2 25
+ // 2 yangzhou d2 24
+ // 3 yangzhou d2 23
+ // 4 yangzhou d2 22
+ // ---------------------- TsBlock-9
+ // 5 yangzhou d2 21
+ // ---------------------- TsBlock-10
+
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
TableScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
StreamSortOperator.class.getSimpleName());
+ Operator childOperator =
+ new Operator() {
+
+ private final long[][] timeArray =
+ new long[][] {
+ {1L, 2L},
+ {3L, 4L, 1L, 2L, 3L, 4L, 5L},
+ {1L, 2L, 3L, 4L, 5L},
+ null,
+ {1L, 2L, 3L, 4L, 5L},
+ {1L, 2L, 3L, 4L, 5L, 1L, 2L},
+ {3L, 4L, 5L},
+ {},
+ {1L, 2L, 3L, 4L},
+ {5L}
+ };
+
+ private final String[][] cityArray =
+ new String[][] {
+ {null, null},
+ {null, null, "beijing", "beijing", "beijing", "beijing",
"beijing"},
+ {"beijing", "beijing", "beijing", "beijing", "beijing"},
+ null,
+ {"shanghai", "shanghai", "shanghai", "shanghai", "shanghai"},
+ {
+ "shanghai", "shanghai", "shanghai", "shanghai", "shanghai",
"yangzhou", "yangzhou"
+ },
+ {"yangzhou", "yangzhou", "yangzhou"},
+ {},
+ {"yangzhou", "yangzhou", "yangzhou", "yangzhou"},
+ {"yangzhou"}
+ };
+
+ private final String[][] deviceIdArray =
+ new String[][] {
+ {"d1", "d1"},
+ {"d1", "d1", "d1", "d1", "d1", "d1", "d1"},
+ {"d2", "d2", "d2", "d2", "d2"},
+ null,
+ {"d1", "d1", "d1", "d1", "d1"},
+ {"d2", "d2", "d2", "d2", "d2", "d1", "d1"},
+ {"d1", "d1", "d1"},
+ {},
+ {"d2", "d2", "d2", "d2"},
+ {"d2"}
+ };
+
+ private final int[][] valueArray =
+ new int[][] {
+ {9, 8},
+ {6, 7, 111, 112, 0, 114, 115},
+ {123, 121, 122, 124, 0},
+ null,
+ {12, 0, 11, 14, 15},
+ {21, 22, 23, 24, 25, 11, 12},
+ {15, 0, 13},
+ {},
+ {25, 24, 23, 22},
+ {21}
+ };
+
+ private final boolean[][] valueIsNull =
+ new boolean[][] {
+ {false, false},
+ {false, false, false, false, true, false, false},
+ {false, false, false, false, true},
+ null,
+ {false, true, false, false, false},
+ {false, false, false, false, false, false, false},
+ {false, true, false},
+ {},
+ {false, false, false, false},
+ {false}
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return driverContext.getOperatorContexts().get(0);
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length,
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT,
TSDataType.INT32));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (cityArray[index][i] == null) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder
+ .getColumnBuilder(0)
+ .writeBinary(new Binary(cityArray[index][i],
TSFileConfig.STRING_CHARSET));
+ }
+ if (deviceIdArray[index][i] == null) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+ builder
+ .getColumnBuilder(1)
+ .writeBinary(new Binary(deviceIdArray[index][i],
TSFileConfig.STRING_CHARSET));
+ }
+ if (valueIsNull[index][i]) {
+ builder.getColumnBuilder(2).appendNull();
+ } else {
+ builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return index < valueIsNull.length;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return index >= valueIsNull.length;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ };
+
+ OperatorContext operatorContext =
driverContext.getOperatorContexts().get(1);
+ String filePrefix =
+ sortTmpPrefixPath
+ + File.separator
+ + operatorContext
+ .getDriverContext()
+ .getFragmentInstanceContext()
+ .getId()
+ .getFragmentInstanceId()
+ + File.separator
+ + operatorContext.getDriverContext().getPipelineId()
+ + File.separator;
+
+ List<SortOrder> sortOrderList =
+ Arrays.asList(
+ SortOrder.ASC_NULLS_FIRST, SortOrder.ASC_NULLS_FIRST,
SortOrder.ASC_NULLS_FIRST);
+ List<Integer> sortItemIndexList = Arrays.asList(0, 1, 2);
+ List<TSDataType> sortItemDataTypeList =
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32);
+
+ Comparator<SortKey> comparator =
+ getComparatorForTable(sortOrderList, sortItemIndexList,
sortItemDataTypeList);
+ ;
+
+ Comparator<SortKey> streamKeyComparator =
+ getComparatorForTable(
+ sortOrderList.subList(0, 2),
+ sortItemIndexList.subList(0, 2),
+ sortItemDataTypeList.subList(0, 2));
+
+ return new StreamSortOperator(
+ operatorContext,
+ childOperator,
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32),
+ filePrefix,
+ comparator,
+ streamKeyComparator,
+ maxLinesToOutput);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
index b164677ae6c..98d6f765d4c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.sort;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -94,7 +95,10 @@ public class SortUtilTest {
sortKeyList.add(new SortKey(tsBlock, i));
}
- SortBufferManager sortBufferManager = new SortBufferManager();
+ SortBufferManager sortBufferManager =
+ new SortBufferManager(
+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
try {
sortBufferManager.allocateOneSortBranch();
diskSpiller.spillSortedData(sortKeyList);