This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 411255134b1 Add support for StreamSortOperator
411255134b1 is described below
commit 411255134b116640412bbac41a0b7e1aa2ef0c79
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Jun 7 17:32:02 2024 +0800
Add support for StreamSortOperator
---
...SortOperator.java => AbstractSortOperator.java} | 146 +++--
.../execution/operator/process/SortOperator.java | 309 +----------
.../operator/process/StreamSortOperator.java | 203 +++++++
.../plan/planner/TableOperatorGenerator.java | 53 ++
.../planner/plan/parameter/SeriesScanOptions.java | 32 +-
.../apache/iotdb/db/utils/sort/DiskSpiller.java | 17 +-
.../iotdb/db/utils/sort/SortBufferManager.java | 33 +-
.../execution/operator/SortOperatorTest.java | 47 +-
.../operator/process/StreamSortOperatorTest.java | 591 +++++++++++++++++++++
.../apache/iotdb/db/utils/sort/SortUtilTest.java | 6 +-
10 files changed, 1009 insertions(+), 428 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
similarity index 77%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index e34112c2903..ca152e6d77b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -17,14 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.operator.process.relational;
+package org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
-import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -40,7 +38,6 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
-import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,17 +47,13 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
+public abstract class AbstractSortOperator implements ProcessOperator {
-public class StreamSortOperator implements ProcessOperator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSortOperator.class);
- private static final Logger LOGGER =
LoggerFactory.getLogger(StreamSortOperator.class);
-
- private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
- private final OperatorContext operatorContext;
- private final Operator inputOperator;
- private final TsBlockBuilder tsBlockBuilder;
+ protected final OperatorContext operatorContext;
+ protected final Operator inputOperator;
+ protected final TsBlockBuilder tsBlockBuilder;
// Use to output the result in memory.
// Because the memory may be larger than tsBlockBuilder's max size
@@ -71,22 +64,22 @@ public class StreamSortOperator implements ProcessOperator {
private final Comparator<SortKey> comparator;
private long cachedBytes;
private final DiskSpiller diskSpiller;
- private final SortBufferManager sortBufferManager;
+ private SortBufferManager sortBufferManager;
// For mergeSort
private MergeSortHeap mergeSortHeap;
private List<SortReader> sortReaders;
- private boolean[] noMoreData;
+ protected boolean[] noMoreData;
private final int maxReturnSize =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
- private long prepareUntilReadyCost = 0;
- private long dataSize = 0;
+ protected long prepareUntilReadyCost = 0;
+ protected long dataSize = 0;
private long sortCost = 0;
- public StreamSortOperator(
+ AbstractSortOperator(
OperatorContext operatorContext,
Operator inputOperator,
List<TSDataType> dataTypes,
@@ -100,7 +93,30 @@ public class StreamSortOperator implements ProcessOperator {
this.cachedBytes = 0;
this.diskSpiller =
new DiskSpiller(folderPath, folderPath +
operatorContext.getOperatorId(), dataTypes);
- this.sortBufferManager = new SortBufferManager();
+ this.sortBufferManager =
+ new SortBufferManager(
+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
+ }
+
+ protected void buildResult() throws IoTDBException {
+ if (diskSpiller.hasSpilledData()) {
+ try {
+ prepareSortReaders();
+ mergeSort();
+ } catch (Exception e) {
+ clear();
+ throw e;
+ }
+ } else {
+ if (curRow == -1) {
+ long startTime = System.nanoTime();
+ cachedData.sort(comparator);
+ sortCost += System.nanoTime() - startTime;
+ curRow = 0;
+ }
+ buildTsBlockInMemory();
+ }
}
@Override
@@ -113,45 +129,6 @@ public class StreamSortOperator implements ProcessOperator
{
return inputOperator.isBlocked();
}
- @Override
- public TsBlock next() throws Exception {
- if (!inputOperator.hasNextWithTimer()) {
- if (diskSpiller.hasSpilledData()) {
- try {
- prepareSortReaders();
- return mergeSort();
- } catch (Exception e) {
- clear();
- throw e;
- }
- } else {
- if (curRow == -1) {
- long startTime = System.nanoTime();
- cachedData.sort(comparator);
- sortCost += System.nanoTime() - startTime;
- curRow = 0;
- }
- return buildTsBlockInMemory();
- }
- }
- long startTime = System.nanoTime();
- try {
- TsBlock tsBlock = inputOperator.nextWithTimer();
- if (tsBlock == null) {
- return null;
- }
- dataSize += tsBlock.getRetainedSizeInBytes();
- cacheTsBlock(tsBlock);
- } catch (IoTDBException e) {
- clear();
- throw e;
- } finally {
- prepareUntilReadyCost += System.nanoTime() - startTime;
- }
-
- return null;
- }
-
private void recordMetrics() {
operatorContext.recordSpecifiedInfo("prepareCost/ns",
Long.toString(prepareUntilReadyCost));
operatorContext.recordSpecifiedInfo("sortedDataSize",
Long.toString(dataSize));
@@ -185,9 +162,9 @@ public class StreamSortOperator implements ProcessOperator {
noMoreData = new boolean[sortReaders.size()];
}
- private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+ protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
long bytesSize = tsBlock.getRetainedSizeInBytes();
- if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
+ if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) {
cachedBytes += bytesSize;
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
cachedData.add(new MergeSortKey(tsBlock, i));
@@ -211,8 +188,7 @@ public class StreamSortOperator implements ProcessOperator {
diskSpiller.spillSortedData(cachedData);
}
- private TsBlock buildTsBlockInMemory() {
- tsBlockBuilder.reset();
+ private void buildTsBlockInMemory() {
TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
for (int i = curRow; i < cachedData.size(); i++) {
@@ -232,11 +208,9 @@ public class StreamSortOperator implements ProcessOperator
{
break;
}
}
-
- return tsBlockBuilder.build();
}
- private TsBlock mergeSort() throws IoTDBException {
+ private void mergeSort() throws IoTDBException {
// 1. fill the input from each reader
initMergeSortHeap();
@@ -245,7 +219,6 @@ public class StreamSortOperator implements ProcessOperator {
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
// 2. do merge sort until one TsBlock is consumed up
- tsBlockBuilder.reset();
TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
while (!mergeSortHeap.isEmpty()) {
@@ -277,7 +250,6 @@ public class StreamSortOperator implements ProcessOperator {
}
}
sortCost += System.nanoTime() - startTime;
- return tsBlockBuilder.build();
}
private void initMergeSortHeap() throws IoTDBException {
@@ -329,15 +301,22 @@ public class StreamSortOperator implements
ProcessOperator {
sortReader.close();
}
}
+ sortReaders = null;
+ diskSpiller.reset();
} catch (Exception e) {
- LOGGER.error("Fail to close fileChannel", e);
+ LOGGER.warn("Fail to close fileChannel", e);
}
}
@Override
public boolean hasNext() throws Exception {
- return inputOperator.hasNextWithTimer()
- || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+ return inputOperator.hasNextWithTimer() || hasMoreSortedData();
+ }
+
+ protected boolean hasMoreSortedData() {
+ return (!diskSpiller.hasSpilledData()
+ && ((curRow == -1 && !cachedData.isEmpty())
+ || (curRow != -1 && curRow != cachedData.size())))
|| (diskSpiller.hasSpilledData() && hasMoreData());
}
@@ -358,7 +337,7 @@ public class StreamSortOperator implements ProcessOperator {
public long calculateMaxPeekMemory() {
return inputOperator.calculateMaxPeekMemoryWithCounter()
+ inputOperator.calculateRetainedSizeAfterCallingNext()
- + SORT_BUFFER_SIZE;
+ + sortBufferManager.getSortBufferSize();
}
@Override
@@ -368,15 +347,22 @@ public class StreamSortOperator implements
ProcessOperator {
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return inputOperator.calculateRetainedSizeAfterCallingNext() +
SORT_BUFFER_SIZE;
+ return inputOperator.calculateRetainedSizeAfterCallingNext()
+ + sortBufferManager.getSortBufferSize();
}
- @Override
- public long ramBytesUsed() {
- return INSTANCE_SIZE
- +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
- +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
- + RamUsageEstimator.sizeOf(noMoreData)
- + tsBlockBuilder.getRetainedSizeInBytes();
+ protected void resetSortRelatedResource() {
+ curRow = -1;
+ cachedData = new ArrayList<>();
+ cachedBytes = 0;
+ clear();
+ sortBufferManager =
+ new SortBufferManager(
+ sortBufferManager.getMaxTsBlockSizeInBytes(),
sortBufferManager.getSortBufferSize());
+ if (mergeSortHeap != null && !mergeSortHeap.isEmpty()) {
+ throw new IllegalStateException("mergeSortHeap should be empty!");
+ }
+ mergeSortHeap = null;
+ noMoreData = null;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
index 55453353f0d..0f9ca2bd848 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
@@ -23,66 +23,19 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
-import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.db.utils.datastructure.SortKey;
-import org.apache.iotdb.db.utils.sort.DiskSpiller;
-import org.apache.iotdb.db.utils.sort.MemoryReader;
-import org.apache.iotdb.db.utils.sort.SortBufferManager;
-import org.apache.iotdb.db.utils.sort.SortReader;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.RamUsageEstimator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import static
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
-
-public class SortOperator implements ProcessOperator {
+public class SortOperator extends AbstractSortOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
- private final OperatorContext operatorContext;
- private final Operator inputOperator;
- private final TsBlockBuilder tsBlockBuilder;
-
- // Use to output the result in memory.
- // Because the memory may be larger than tsBlockBuilder's max size
- // so the data may be return in multiple times.
- private int curRow = -1;
-
- private List<SortKey> cachedData;
- private final Comparator<SortKey> comparator;
- private long cachedBytes;
- private final DiskSpiller diskSpiller;
- private final SortBufferManager sortBufferManager;
-
- // For mergeSort
-
- private MergeSortHeap mergeSortHeap;
- private List<SortReader> sortReaders;
- private boolean[] noMoreData;
-
- private static final Logger logger =
LoggerFactory.getLogger(SortOperator.class);
-
- private final int maxReturnSize =
- TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
- private long prepareUntilReadyCost = 0;
- private long dataSize = 0;
- private long sortCost = 0;
public SortOperator(
OperatorContext operatorContext,
@@ -90,47 +43,16 @@ public class SortOperator implements ProcessOperator {
List<TSDataType> dataTypes,
String folderPath,
Comparator<SortKey> comparator) {
- this.operatorContext = operatorContext;
- this.inputOperator = inputOperator;
- this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.cachedData = new ArrayList<>();
- this.comparator = comparator;
- this.cachedBytes = 0;
- this.diskSpiller =
- new DiskSpiller(folderPath, folderPath +
operatorContext.getOperatorId(), dataTypes);
- this.sortBufferManager = new SortBufferManager();
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- return inputOperator.isBlocked();
+ super(operatorContext, inputOperator, dataTypes, folderPath, comparator);
}
@Override
public TsBlock next() throws Exception {
if (!inputOperator.hasNextWithTimer()) {
- if (diskSpiller.hasSpilledData()) {
- try {
- prepareSortReaders();
- return mergeSort();
- } catch (Exception e) {
- clear();
- throw e;
- }
- } else {
- if (curRow == -1) {
- long startTime = System.nanoTime();
- cachedData.sort(comparator);
- sortCost += System.nanoTime() - startTime;
- curRow = 0;
- }
- return buildTsBlockInMemory();
- }
+ buildResult();
+ TsBlock res = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ return res;
}
long startTime = System.nanoTime();
try {
@@ -150,225 +72,6 @@ public class SortOperator implements ProcessOperator {
return null;
}
- private void recordMetrics() {
- operatorContext.recordSpecifiedInfo("prepareCost/ns",
Long.toString(prepareUntilReadyCost));
- operatorContext.recordSpecifiedInfo("sortedDataSize",
Long.toString(dataSize));
- operatorContext.recordSpecifiedInfo("sortCost/ns",
Long.toString(sortCost));
- int spilledFileSize = diskSpiller.getFileSize();
- if (spilledFileSize > 0) {
- operatorContext.recordSpecifiedInfo(
- "merge sort branch", Integer.toString(diskSpiller.getFileSize() +
1));
- }
- }
-
- private void prepareSortReaders() throws IoTDBException {
- if (sortReaders != null) {
- return;
- }
- sortReaders = new ArrayList<>();
- if (cachedBytes != 0) {
- cachedData.sort(comparator);
- if (sortBufferManager.allocate(cachedBytes)) {
- sortReaders.add(
- new MemoryReader(
-
cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList())));
- } else {
- sortBufferManager.allocateOneSortBranch();
- diskSpiller.spillSortedData(cachedData);
- cachedData = null;
- }
- }
- sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
- // if reader is finished
- noMoreData = new boolean[sortReaders.size()];
- }
-
- private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
- long bytesSize = tsBlock.getRetainedSizeInBytes();
- if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
- cachedBytes += bytesSize;
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- cachedData.add(new MergeSortKey(tsBlock, i));
- }
- } else {
- cachedData.sort(comparator);
- spill();
- cachedData.clear();
- cachedBytes = bytesSize;
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- cachedData.add(new MergeSortKey(tsBlock, i));
- }
- }
- }
-
- private void spill() throws IoTDBException {
- // if current memory cannot put this tsBlock, an exception will be thrown
in spillSortedData()
- // because there should be at least
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
- // one branch.
- sortBufferManager.allocateOneSortBranch();
- diskSpiller.spillSortedData(cachedData);
- }
-
- private TsBlock buildTsBlockInMemory() {
- tsBlockBuilder.reset();
- TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
- ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- for (int i = curRow; i < cachedData.size(); i++) {
- SortKey sortKey = cachedData.get(i);
- TsBlock tsBlock = sortKey.tsBlock;
- timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex));
- for (int j = 0; j < valueColumnBuilders.length; j++) {
- if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) {
- valueColumnBuilders[j].appendNull();
- continue;
- }
- valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex);
- }
- tsBlockBuilder.declarePosition();
- curRow++;
- if (tsBlockBuilder.isFull()) {
- break;
- }
- }
-
- return tsBlockBuilder.build();
- }
-
- private TsBlock mergeSort() throws IoTDBException {
-
- // 1. fill the input from each reader
- initMergeSortHeap();
-
- long startTime = System.nanoTime();
- long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
-
- // 2. do merge sort until one TsBlock is consumed up
- tsBlockBuilder.reset();
- TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
- ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- while (!mergeSortHeap.isEmpty()) {
-
- MergeSortKey mergeSortKey = mergeSortHeap.poll();
- TsBlock targetBlock = mergeSortKey.tsBlock;
- timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
- for (int i = 0; i < valueColumnBuilders.length; i++) {
- if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
- valueColumnBuilders[i].appendNull();
- continue;
- }
- valueColumnBuilders[i].write(targetBlock.getColumn(i),
mergeSortKey.rowIndex);
- }
- tsBlockBuilder.declarePosition();
-
- int readerIndex = mergeSortKey.inputChannelIndex;
- mergeSortKey = readNextMergeSortKey(readerIndex);
- if (mergeSortKey != null) {
- mergeSortHeap.push(mergeSortKey);
- } else {
- noMoreData[readerIndex] = true;
- sortBufferManager.releaseOneSortBranch();
- }
-
- // break if time is out or tsBlockBuilder is full or sortBuffer is not
enough
- if (System.nanoTime() - startTime > maxRuntime ||
tsBlockBuilder.isFull()) {
- break;
- }
- }
- sortCost += System.nanoTime() - startTime;
- return tsBlockBuilder.build();
- }
-
- private void initMergeSortHeap() throws IoTDBException {
- if (mergeSortHeap == null) {
- mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
- for (int i = 0; i < sortReaders.size(); i++) {
- SortReader sortReader = sortReaders.get(i);
- if (sortReader.hasNext()) {
- MergeSortKey mergeSortKey = sortReader.next();
- mergeSortKey.inputChannelIndex = i;
- mergeSortHeap.push(mergeSortKey);
- } else {
- noMoreData[i] = true;
- sortBufferManager.releaseOneSortBranch();
- }
- }
- }
- }
-
- private MergeSortKey readNextMergeSortKey(int readerIndex) throws
IoTDBException {
- SortReader sortReader = sortReaders.get(readerIndex);
- if (sortReader.hasNext()) {
- MergeSortKey mergeSortKey = sortReader.next();
- mergeSortKey.inputChannelIndex = readerIndex;
- return mergeSortKey;
- }
- return null;
- }
-
- private boolean hasMoreData() {
- if (noMoreData == null) {
- return true;
- }
- for (boolean noMore : noMoreData) {
- if (!noMore) {
- return true;
- }
- }
- return false;
- }
-
- public void clear() {
- if (!diskSpiller.hasSpilledData()) {
- return;
- }
- try {
- if (sortReaders != null) {
- for (SortReader sortReader : sortReaders) {
- sortReader.close();
- }
- }
- } catch (Exception e) {
- logger.error("Fail to close fileChannel", e);
- }
- }
-
- @Override
- public boolean hasNext() throws Exception {
- return inputOperator.hasNextWithTimer()
- || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
- || (diskSpiller.hasSpilledData() && hasMoreData());
- }
-
- @Override
- public void close() throws Exception {
- recordMetrics();
- cachedData = null;
- clear();
- inputOperator.close();
- }
-
- @Override
- public boolean isFinished() throws Exception {
- return !this.hasNextWithTimer();
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return inputOperator.calculateMaxPeekMemoryWithCounter()
- + inputOperator.calculateRetainedSizeAfterCallingNext()
- + SORT_BUFFER_SIZE;
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return inputOperator.calculateRetainedSizeAfterCallingNext() +
SORT_BUFFER_SIZE;
- }
-
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
new file mode 100644
index 00000000000..d53a3480f93
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class StreamSortOperator extends AbstractSortOperator {
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(StreamSortOperator.class);
+
+ private final Comparator<SortKey> streamSortComparator;
+
+ private final int minLinesToOutput;
+
+ // accumulated unprinted line count
+ private long remainingCount = 0;
+
+ // child operator has no more data
+ private boolean noMoreDataFromChild = false;
+
+ // unprocessed data from the previous iteration
+ private TsBlock currentTsBlock = null;
+
+ // be able to start stream outputting partial data
+ private boolean canStreamOutput = false;
+
+ private SortKey lastRow = null;
+
+ public StreamSortOperator(
+ OperatorContext operatorContext,
+ Operator inputOperator,
+ List<TSDataType> dataTypes,
+ String folderPath,
+ Comparator<SortKey> comparator,
+ Comparator<SortKey> streamSortComparator,
+ int minLinesToOutput) {
+ super(operatorContext, inputOperator, dataTypes, folderPath, comparator);
+ this.streamSortComparator = streamSortComparator;
+ this.minLinesToOutput = minLinesToOutput;
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (canStreamOutput || !tsBlockBuilder.isEmpty()) {
+
+ buildResult();
+
+ // this time stream output data has been consumed up
+ if (!hasMoreSortedData()) {
+ canStreamOutput = false;
+ // clear spilled disk space and memory data structure
+ resetSortRelatedResource();
+ }
+
+ if (tsBlockBuilder.isFull() || consumedUp()) {
+ TsBlock res = tsBlockBuilder.build();
+ remainingCount -= res.getPositionCount();
+ tsBlockBuilder.reset();
+ return res;
+ }
+ }
+
+ if (currentTsBlock != null) {
+ cacheTsBlock(currentTsBlock);
+ currentTsBlock = null;
+ }
+
+ long startTime = System.nanoTime();
+ if (!inputOperator.hasNextWithTimer()) {
+ noMoreDataFromChild = true;
+ canStreamOutput = true;
+ } else {
+ try {
+ // init currentTsBlock from child operator
+ currentTsBlock = inputOperator.nextWithTimer();
+ if (currentTsBlock == null || currentTsBlock.isEmpty()) {
+ currentTsBlock = null;
+ return null;
+ }
+ // record total sorted data size
+ dataSize += currentTsBlock.getRetainedSizeInBytes();
+
+ // if currentTsBlock line count + remainingCount is still less than
minLinesToOutput, just
+ // cache it
+ if (currentTsBlock.getPositionCount() + remainingCount <
minLinesToOutput) {
+ cacheTsBlock(currentTsBlock);
+ remainingCount += currentTsBlock.getPositionCount();
+ currentTsBlock = null;
+ } else {
+ // if stream compare key of the last row of currentTsBlock is same
as the last row of
+ // previous TsBlock, we cannot stream output, just cache it
+ if (isStreamCompareKeySame()) {
+ cacheTsBlock(currentTsBlock);
+ remainingCount += currentTsBlock.getPositionCount();
+ currentTsBlock = null;
+ } else {
+ // traverse the current TsBlock backward until encountering a row
with a StreamCompKey
+ // different from the last row, and return the index of that row
+ int endIndex = getEndIndexFromCurrentTsBlock();
+ // if total count of `canOutput` lines is less than
minLinesToOutput, we won't output
+ // them, we just cache the whole currentTsBlock
+ if (endIndex == -1 || endIndex + remainingCount + 1 <
minLinesToOutput) {
+ cacheTsBlock(currentTsBlock);
+ remainingCount += currentTsBlock.getPositionCount();
+ currentTsBlock = null;
+ } else {
+ // `canOutput` lines count is larger than minLinesToOutput, so
we can stream output
+ cacheTsBlock(currentTsBlock.getRegion(0, endIndex + 1));
+ remainingCount += currentTsBlock.getPositionCount();
+ canStreamOutput = true;
+ currentTsBlock = currentTsBlock.subTsBlock(endIndex + 1);
+ }
+ }
+ }
+ } catch (IoTDBException e) {
+ clear();
+ throw e;
+ } finally {
+ prepareUntilReadyCost += System.nanoTime() - startTime;
+ }
+ }
+ return null;
+ }
+
+ private boolean consumedUp() {
+ return remainingCount == tsBlockBuilder.getPositionCount() &&
noMoreDataFromChild;
+ }
+
+ // return -1, if not found
+ private int getEndIndexFromCurrentTsBlock() {
+ SortKey sortKeyOfLastRow = new SortKey(currentTsBlock,
currentTsBlock.getPositionCount() - 1);
+ SortKey sortKeyOfFoundRow = new SortKey(currentTsBlock,
currentTsBlock.getPositionCount() - 2);
+ for (int index = currentTsBlock.getPositionCount() - 2; index >= 0;
index--) {
+ sortKeyOfFoundRow.rowIndex = index;
+ if (streamSortComparator.compare(sortKeyOfFoundRow, sortKeyOfLastRow) !=
0) {
+ return index;
+ }
+ }
+ return -1;
+ }
+
+ private boolean isStreamCompareKeySame() {
+ return lastRow == null
+ || streamSortComparator.compare(
+ lastRow, new SortKey(currentTsBlock,
currentTsBlock.getPositionCount() - 1))
+ == 0;
+ }
+
+ protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+ super.cacheTsBlock(tsBlock);
+ lastRow = new SortKey(tsBlock, tsBlock.getPositionCount() - 1);
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return super.hasNext() || !tsBlockBuilder.isEmpty() || currentTsBlock !=
null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ lastRow = null;
+ currentTsBlock = null;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ + RamUsageEstimator.sizeOf(noMoreData)
+ + tsBlockBuilder.getRetainedSizeInBytes();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 73b333a8c86..d7d62ec2e86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
@@ -63,6 +64,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Expression;
@@ -240,6 +242,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
SeriesScanOptions.Builder scanOptionsBuilder =
getSeriesScanOptionsBuilder(context);
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
+
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
Expression pushDownPredicate = node.getPushDownPredicate();
@@ -711,4 +714,54 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
});
}
+
+ @Override
+ public Operator visitStreamSort(StreamSortNode node,
LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ StreamSortNode.class.getSimpleName());
+ List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
+ int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
+
+ List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount);
+ List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount);
+ genSortInformation(
+ node.getOutputSymbols(),
+ node.getOrderingScheme(),
+ sortItemIndexList,
+ sortItemDataTypeList,
+ context.getTypeProvider());
+
+ String filePrefix =
+ IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ + File.separator
+ +
operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId()
+ + File.separator
+ + operatorContext.getDriverContext().getPipelineId()
+ + File.separator;
+
+ context.getDriverContext().setHaveTmpFile(true);
+
context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true);
+
+ Operator child = node.getChild().accept(this, context);
+
+ return new StreamSortOperator(
+ operatorContext,
+ child,
+ dataTypes,
+ filePrefix,
+ getComparatorForTable(
+ node.getOrderingScheme().getOrderingList(), sortItemIndexList,
sortItemDataTypeList),
+ getComparatorForTable(
+ node.getOrderingScheme()
+ .getOrderingList()
+ .subList(0, node.getStreamCompareKeyEndIndex() + 1),
+ sortItemIndexList.subList(0, node.getStreamCompareKeyEndIndex() +
1),
+ sortItemDataTypeList.subList(0, node.getStreamCompareKeyEndIndex()
+ 1)),
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index c39e4f6ea18..1bd6892541c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -37,24 +37,29 @@ public class SeriesScanOptions {
private Filter globalTimeFilter;
- private Filter pushDownFilter;
+ private final Filter pushDownFilter;
private final long pushDownLimit;
private final long pushDownOffset;
private final Set<String> allSensors;
+ private final boolean pushLimitToEachDevice;
+ private PaginationController paginationController;
+
public SeriesScanOptions(
Filter globalTimeFilter,
Filter pushDownFilter,
long pushDownLimit,
long pushDownOffset,
- Set<String> allSensors) {
+ Set<String> allSensors,
+ boolean pushLimitToEachDevice) {
this.globalTimeFilter = globalTimeFilter;
this.pushDownFilter = pushDownFilter;
this.pushDownLimit = pushDownLimit;
this.pushDownOffset = pushDownOffset;
this.allSensors = allSensors;
+ this.pushLimitToEachDevice = pushLimitToEachDevice;
}
public static SeriesScanOptions getDefaultSeriesScanOptions(IFullPath
seriesPath) {
@@ -82,7 +87,14 @@ public class SeriesScanOptions {
}
public PaginationController getPaginationController() {
- return new PaginationController(pushDownLimit, pushDownOffset);
+ if (pushLimitToEachDevice) {
+ return new PaginationController(pushDownLimit, pushDownOffset);
+ } else {
+ if (paginationController == null) {
+ paginationController = new PaginationController(pushDownLimit,
pushDownOffset);
+ }
+ return paginationController;
+ }
}
public void setTTL(long dataTTL) {
@@ -114,6 +126,8 @@ public class SeriesScanOptions {
private Set<String> allSensors;
+ private boolean pushLimitToEachDevice = true;
+
public Builder withGlobalTimeFilter(Filter globalTimeFilter) {
this.globalTimeFilter = globalTimeFilter;
return this;
@@ -134,13 +148,23 @@ public class SeriesScanOptions {
return this;
}
+ public Builder withPushLimitToEachDevice(boolean pushLimitToEachDevice) {
+ this.pushLimitToEachDevice = pushLimitToEachDevice;
+ return this;
+ }
+
public void withAllSensors(Set<String> allSensors) {
this.allSensors = allSensors;
}
public SeriesScanOptions build() {
return new SeriesScanOptions(
- globalTimeFilter, pushDownFilter, pushDownLimit, pushDownOffset,
allSensors);
+ globalTimeFilter,
+ pushDownFilter,
+ pushDownLimit,
+ pushDownOffset,
+ allSensors,
+ pushLimitToEachDevice);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
index c27b176bd9f..b0d82725346 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java
@@ -104,11 +104,16 @@ public class DiskSpiller {
}
}
- private void writeData(List<TsBlock> sortedData, String fileName)
- throws IOException, IoTDBException {
+ private void writeData(List<TsBlock> sortedData, String fileName) throws
IoTDBException {
Path filePath = Paths.get(fileName);
- Files.createFile(filePath);
- try (FileChannel fileChannel = FileChannel.open(filePath,
StandardOpenOption.WRITE)) {
+ // for stream sort we may reuse the previous tmp file name, so we need
TRUNCATE_EXISTING and
+ // CREATE
+ try (FileChannel fileChannel =
+ FileChannel.open(
+ filePath,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.TRUNCATE_EXISTING,
+ StandardOpenOption.CREATE)) {
for (TsBlock tsBlock : sortedData) {
ByteBuffer tsBlockBuffer = serde.serialize(tsBlock);
ByteBuffer length = ByteBuffer.allocate(4);
@@ -168,4 +173,8 @@ public class DiskSpiller {
public int getFileSize() {
return fileIndex;
}
+
+ public void reset() {
+ fileIndex = 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
index 8f2de51db7f..579f2a99b14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortBufferManager.java
@@ -19,33 +19,30 @@
package org.apache.iotdb.db.utils.sort;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import org.apache.tsfile.common.conf.TSFileDescriptor;
-
public class SortBufferManager {
- private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
- TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
- public static final long SORT_BUFFER_SIZE =
- IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+ private final int maxTsBlockSizeInBytes;
+ private final long sortBufferSize;
private long bufferUsed;
- private static final long BUFFER_SIZE_FOR_ONE_BRANCH =
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ private final long bufferSizeForOneBranch;
private final long bufferAvailableForAllBranch;
private long readerBuffer = 0;
private long branchNum = 0;
- public SortBufferManager() {
- this.bufferAvailableForAllBranch = SORT_BUFFER_SIZE -
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ public SortBufferManager(int maxTsBlockSizeInBytes, long sortBufferSize) {
+ this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+ this.sortBufferSize = sortBufferSize;
+ this.bufferAvailableForAllBranch = sortBufferSize - maxTsBlockSizeInBytes;
+ this.bufferSizeForOneBranch = maxTsBlockSizeInBytes;
// the initial value is the buffer for output.
- this.bufferUsed = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ this.bufferUsed = maxTsBlockSizeInBytes;
}
public void allocateOneSortBranch() {
- boolean success = allocate(BUFFER_SIZE_FOR_ONE_BRANCH);
+ boolean success = allocate(bufferSizeForOneBranch);
if (!success) {
throw new IllegalArgumentException("Not enough memory for sorting");
}
@@ -53,7 +50,7 @@ public class SortBufferManager {
}
private boolean check(long size) {
- return bufferUsed + size < SORT_BUFFER_SIZE;
+ return bufferUsed + size < sortBufferSize;
}
public boolean allocate(long size) {
@@ -78,4 +75,12 @@ public class SortBufferManager {
readerBuffer = bufferAvailableForAllBranch / branchNum;
return readerBuffer;
}
+
+ public int getMaxTsBlockSizeInBytes() {
+ return maxTsBlockSizeInBytes;
+ }
+
+ public long getSortBufferSize() {
+ return sortBufferSize;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
index da098b19801..99acfd45366 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -79,6 +80,9 @@ public class SortOperatorTest {
private final List<TsFileResource> seqResources = new ArrayList<>();
private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private final String sortTmpPrefixPath =
+ "target" + File.separator + "sort" + File.separator + "tmp";
+
private int dataNodeId;
private int maxTsBlockSizeInBytes;
@@ -96,6 +100,7 @@ public class SortOperatorTest {
@After
public void tearDown() throws IOException {
SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ cleanDir(sortTmpPrefixPath);
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
}
@@ -197,7 +202,7 @@ public class SortOperatorTest {
OperatorContext operatorContext =
driverContext.getOperatorContexts().get(3);
String filePrefix =
- "target"
+ sortTmpPrefixPath
+ File.separator
+ operatorContext
.getDriverContext()
@@ -228,10 +233,9 @@ public class SortOperatorTest {
// with data spilling
@Test
public void sortOperatorSpillingTest() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
long sortBufferSize =
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
- try {
- IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(5000);
- SortOperator root = (SortOperator) genSortOperator(Ordering.ASC, true);
+ try (SortOperator root = (SortOperator) genSortOperator(Ordering.ASC,
true)) {
int lastValue = -1;
int count = 0;
while (root.isBlocked().isDone() && root.hasNext()) {
@@ -248,7 +252,6 @@ public class SortOperatorTest {
count++;
}
}
- root.close();
assertEquals(500, count);
} finally {
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
@@ -258,24 +261,24 @@ public class SortOperatorTest {
// no data spilling
@Test
public void sortOperatorNormalTest() throws Exception {
- Operator root = genSortOperator(Ordering.ASC, true);
- int lastValue = -1;
- int count = 0;
- while (root.isBlocked().isDone() && root.hasNext()) {
- TsBlock tsBlock = root.next();
- if (tsBlock == null) continue;
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- long time = tsBlock.getTimeByIndex(i);
- int v1 = tsBlock.getColumn(0).getInt(i);
- int v2 = tsBlock.getColumn(1).getInt(i);
- assertTrue(lastValue == -1 || lastValue < v1);
- assertEquals(getValue(time), v1);
- assertEquals(v1, v2);
- lastValue = v1;
- count++;
+ try (Operator root = genSortOperator(Ordering.ASC, true)) {
+ int lastValue = -1;
+ int count = 0;
+ while (root.isBlocked().isDone() && root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock == null) continue;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeByIndex(i);
+ int v1 = tsBlock.getColumn(0).getInt(i);
+ int v2 = tsBlock.getColumn(1).getInt(i);
+ assertTrue(lastValue == -1 || lastValue < v1);
+ assertEquals(getValue(time), v1);
+ assertEquals(v1, v2);
+ lastValue = v1;
+ count++;
+ }
}
+ assertEquals(500, count);
}
- root.close();
- assertEquals(500, count);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
new file mode 100644
index 00000000000..f273a183604
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperatorTest.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class StreamSortOperatorTest {
+
+ private static final String sortTmpPrefixPath =
+ "target" + File.separator + "sort" + File.separator + "tmp";
+
+ private static final ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"sortOperator-test-instance-notification");
+
+ private final long[] timeArray =
+ new long[] {
+ 3L, 4L, 2L, 1L, 3L, 1L, 2L, 4L, 5L, 5L, 2L, 3L, 1L, 4L, 2L, 3L, 1L,
4L, 5L, 1L, 2L, 3L, 4L,
+ 5L, 4L, 1L, 2L, 5L, 3L, 5L, 4L, 3L, 2L, 1L
+ };
+ private final String[] column1Array =
+ new String[] {
+ null,
+ null,
+ null,
+ null,
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "beijing",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "shanghai",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou",
+ "yangzhou"
+ };
+ private final boolean[] column1IsNull =
+ new boolean[] {
+ true, true, true, true, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false
+ };
+ private final String[] column2Array =
+ new String[] {
+ "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1", "d2", "d2",
"d2", "d2", "d2", "d1",
+ "d1", "d1", "d1", "d1", "d2", "d2", "d2", "d2", "d2", "d1", "d1",
"d1", "d1", "d1", "d2",
+ "d2", "d2", "d2", "d2"
+ };
+ private final boolean[] column2IsNull =
+ new boolean[] {
+ false, false, false, false, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false, false, false,
false, false, false,
+ false, false, false, false, false, false, false, false
+ };
+ private final int[] column3Array =
+ new int[] {
+ 6, 7, 8, 9, 0, 111, 112, 114, 115, 0, 121, 122, 123, 124, 0, 11, 12,
14, 15, 21, 22, 23, 24,
+ 25, 0, 11, 12, 13, 15, 21, 22, 23, 24, 25
+ };
+ private final boolean[] column3IsNull =
+ new boolean[] {
+ false, false, false, false, true, false, false, false, false, true,
false, false, false,
+ false, true, false, false, false, false, false, false, false, false,
false, true, false,
+ false, false, false, false, false, false, false, false
+ };
+
+ @After
+ public void cleanUp() throws IOException {
+ cleanDir(sortTmpPrefixPath);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ instanceNotificationExecutor.shutdown();
+ }
+
+ @Test
+ public void allInMemoryTest() {
+
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void allInMemoryTes2() {
+ int maxTsBlockLineNumber =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ assertEquals(2, tsBlock.getPositionCount());
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+ }
+ }
+
+ @Test
+ public void someInDiskTest() {
+
+ long sortBufferSize =
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+ int maxTsBlockSizeInBytes =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(1000)) {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+ }
+ }
+
+ @Test
+ public void someInDiskTest2() {
+ long sortBufferSize =
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+ int maxTsBlockSizeInBytes =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ int maxTsBlockLineNumber =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+ IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(500);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(50);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(2);
+ try (StreamSortOperator streamSortOperator = genStreamSortOperator(2)) {
+
+ int count = 0;
+ ListenableFuture<?> listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ while (!streamSortOperator.isFinished() && streamSortOperator.hasNext())
{
+ TsBlock tsBlock = streamSortOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ assertEquals(2, tsBlock.getPositionCount());
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(
+ column1Array[count],
+
tsBlock.getColumn(0).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(
+ column2Array[count],
+
tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ listenableFuture = streamSortOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxTsBlockLineNumber);
+ }
+ }
+
+ private StreamSortOperator genStreamSortOperator(int maxLinesToOutput) {
+ // child output
+ // Time, city, deviceId, s1
+ // 1 null d1 9
+ // 2 null d1 8
+ // ---------------------- TsBlock-1
+ // 3 null d1 6
+ // 4 null d1 7
+ // 1 beijing d1 111
+ // 2 beijing d1 112
+ // 3 beijing d1 null
+ // 4 beijing d1 114
+ // 5 beijing d1 115
+ // ---------------------- TsBlock-2
+ // 1 beijing d2 123
+ // 2 beijing d2 121
+ // 3 beijing d2 122
+ // 4 beijing d2 124
+ // 5 beijing d2 null
+ // ---------------------- TsBlock-3
+ // null
+ // ---------------------- TsBlock-4
+ // 1 shanghai d1 12
+ // 2 shanghai d1 null
+ // 3 shanghai d1 11
+ // 4 shanghai d1 14
+ // 5 shanghai d1 15
+ // ---------------------- TsBlock-5
+ // 1 shanghai d2 21
+ // 2 shanghai d2 22
+ // 3 shanghai d2 23
+ // 4 shanghai d2 24
+ // 5 shanghai d2 25
+ // 1 yangzhou d1 11
+ // 2 yangzhou d1 12
+ // ---------------------- TsBlock-6
+ // 3 yangzhou d1 15
+ // 4 yangzhou d1 null
+ // 5 yangzhou d1 13
+ // ---------------------- TsBlock-7
+ // empty
+ // ---------------------- TsBlock-8
+ // 1 yangzhou d2 25
+ // 2 yangzhou d2 24
+ // 3 yangzhou d2 23
+ // 4 yangzhou d2 22
+ // ---------------------- TsBlock-9
+ // 5 yangzhou d2 21
+ // ---------------------- TsBlock-10
+
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
TableScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
StreamSortOperator.class.getSimpleName());
+ Operator childOperator =
+ new Operator() {
+
+ private final long[][] timeArray =
+ new long[][] {
+ {1L, 2L},
+ {3L, 4L, 1L, 2L, 3L, 4L, 5L},
+ {1L, 2L, 3L, 4L, 5L},
+ null,
+ {1L, 2L, 3L, 4L, 5L},
+ {1L, 2L, 3L, 4L, 5L, 1L, 2L},
+ {3L, 4L, 5L},
+ {},
+ {1L, 2L, 3L, 4L},
+ {5L}
+ };
+
+ private final String[][] cityArray =
+ new String[][] {
+ {null, null},
+ {null, null, "beijing", "beijing", "beijing", "beijing",
"beijing"},
+ {"beijing", "beijing", "beijing", "beijing", "beijing"},
+ null,
+ {"shanghai", "shanghai", "shanghai", "shanghai", "shanghai"},
+ {
+ "shanghai", "shanghai", "shanghai", "shanghai", "shanghai",
"yangzhou", "yangzhou"
+ },
+ {"yangzhou", "yangzhou", "yangzhou"},
+ {},
+ {"yangzhou", "yangzhou", "yangzhou", "yangzhou"},
+ {"yangzhou"}
+ };
+
+ private final String[][] deviceIdArray =
+ new String[][] {
+ {"d1", "d1"},
+ {"d1", "d1", "d1", "d1", "d1", "d1", "d1"},
+ {"d2", "d2", "d2", "d2", "d2"},
+ null,
+ {"d1", "d1", "d1", "d1", "d1"},
+ {"d2", "d2", "d2", "d2", "d2", "d1", "d1"},
+ {"d1", "d1", "d1"},
+ {},
+ {"d2", "d2", "d2", "d2"},
+ {"d2"}
+ };
+
+ private final int[][] valueArray =
+ new int[][] {
+ {9, 8},
+ {6, 7, 111, 112, 0, 114, 115},
+ {123, 121, 122, 124, 0},
+ null,
+ {12, 0, 11, 14, 15},
+ {21, 22, 23, 24, 25, 11, 12},
+ {15, 0, 13},
+ {},
+ {25, 24, 23, 22},
+ {21}
+ };
+
+ private final boolean[][] valueIsNull =
+ new boolean[][] {
+ {false, false},
+ {false, false, false, false, true, false, false},
+ {false, false, false, false, true},
+ null,
+ {false, true, false, false, false},
+ {false, false, false, false, false, false, false},
+ {false, true, false},
+ {},
+ {false, false, false, false},
+ {false}
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return driverContext.getOperatorContexts().get(0);
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length,
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT,
TSDataType.INT32));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (cityArray[index][i] == null) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder
+ .getColumnBuilder(0)
+ .writeBinary(new Binary(cityArray[index][i],
TSFileConfig.STRING_CHARSET));
+ }
+ if (deviceIdArray[index][i] == null) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+ builder
+ .getColumnBuilder(1)
+ .writeBinary(new Binary(deviceIdArray[index][i],
TSFileConfig.STRING_CHARSET));
+ }
+ if (valueIsNull[index][i]) {
+ builder.getColumnBuilder(2).appendNull();
+ } else {
+ builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return index < valueIsNull.length;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return index >= valueIsNull.length;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ };
+
+ OperatorContext operatorContext =
driverContext.getOperatorContexts().get(1);
+ String filePrefix =
+ sortTmpPrefixPath
+ + File.separator
+ + operatorContext
+ .getDriverContext()
+ .getFragmentInstanceContext()
+ .getId()
+ .getFragmentInstanceId()
+ + File.separator
+ + operatorContext.getDriverContext().getPipelineId()
+ + File.separator;
+
+ List<SortOrder> sortOrderList =
+ Arrays.asList(
+ SortOrder.ASC_NULLS_FIRST, SortOrder.ASC_NULLS_FIRST,
SortOrder.ASC_NULLS_FIRST);
+ List<Integer> sortItemIndexList = Arrays.asList(0, 1, 2);
+ List<TSDataType> sortItemDataTypeList =
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32);
+
+ Comparator<SortKey> comparator =
+ getComparatorForTable(sortOrderList, sortItemIndexList,
sortItemDataTypeList);
+ ;
+
+ Comparator<SortKey> streamKeyComparator =
+ getComparatorForTable(
+ sortOrderList.subList(0, 2),
+ sortItemIndexList.subList(0, 2),
+ sortItemDataTypeList.subList(0, 2));
+
+ return new StreamSortOperator(
+ operatorContext,
+ childOperator,
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32),
+ filePrefix,
+ comparator,
+ streamKeyComparator,
+ maxLinesToOutput);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
index b164677ae6c..98d6f765d4c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/sort/SortUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.sort;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -94,7 +95,10 @@ public class SortUtilTest {
sortKeyList.add(new SortKey(tsBlock, i));
}
- SortBufferManager sortBufferManager = new SortBufferManager();
+ SortBufferManager sortBufferManager =
+ new SortBufferManager(
+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ IoTDBDescriptor.getInstance().getConfig().getSortBufferSize());
try {
sortBufferManager.allocateOneSortBranch();
diskSpiller.spillSortedData(sortKeyList);