This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TableScanEachLimit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6bc4c5eb4849441f0833ae62e84386c0a5ed796e Author: JackieTien97 <[email protected]> AuthorDate: Thu Jun 6 16:55:10 2024 +0800 Add support for StreamSortOperator --- ...SortOperator.java => AbstractSortOperator.java} | 127 ++++----- .../execution/operator/process/SortOperator.java | 309 +-------------------- .../operator/process/StreamSortOperator.java | 203 ++++++++++++++ .../plan/planner/TableOperatorGenerator.java | 53 ++++ .../planner/plan/parameter/SeriesScanOptions.java | 32 ++- .../apache/iotdb/db/utils/sort/DiskSpiller.java | 4 + 6 files changed, 347 insertions(+), 381 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java similarity index 79% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java index e34112c2903..bb8c7a16c2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java @@ -17,14 +17,11 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.execution.operator.process.relational; +package org.apache.iotdb.db.queryengine.execution.operator.process; import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; -import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; import org.apache.iotdb.db.utils.datastructure.MergeSortHeap; import org.apache.iotdb.db.utils.datastructure.MergeSortKey; import org.apache.iotdb.db.utils.datastructure.SortKey; @@ -40,7 +37,6 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; -import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,15 +48,13 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE; -public class StreamSortOperator implements ProcessOperator { +public abstract class AbstractSortOperator implements ProcessOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(StreamSortOperator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSortOperator.class); - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(SortOperator.class); - private final OperatorContext operatorContext; - private final Operator inputOperator; - private final TsBlockBuilder tsBlockBuilder; + protected final OperatorContext operatorContext; + protected final Operator inputOperator; + protected final TsBlockBuilder tsBlockBuilder; // Use to output the result in memory. // Because the memory may be larger than tsBlockBuilder's max size @@ -71,22 +65,22 @@ public class StreamSortOperator implements ProcessOperator { private final Comparator<SortKey> comparator; private long cachedBytes; private final DiskSpiller diskSpiller; - private final SortBufferManager sortBufferManager; + private SortBufferManager sortBufferManager; // For mergeSort private MergeSortHeap mergeSortHeap; private List<SortReader> sortReaders; - private boolean[] noMoreData; + protected boolean[] noMoreData; private final int maxReturnSize = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); - private long prepareUntilReadyCost = 0; - private long dataSize = 0; + protected long prepareUntilReadyCost = 0; + protected long dataSize = 0; private long sortCost = 0; - public StreamSortOperator( + AbstractSortOperator( OperatorContext operatorContext, Operator inputOperator, List<TSDataType> dataTypes, @@ -103,6 +97,26 @@ public class StreamSortOperator implements ProcessOperator { this.sortBufferManager = new SortBufferManager(); } + protected void buildResult() throws IoTDBException { + if (diskSpiller.hasSpilledData()) { + try { + prepareSortReaders(); + mergeSort(); + } catch (Exception e) { + clear(); + throw e; + } + } else { + if (curRow == -1) { + long startTime = System.nanoTime(); + cachedData.sort(comparator); + sortCost += System.nanoTime() - startTime; + curRow = 0; + } + buildTsBlockInMemory(); + } + } + @Override public OperatorContext getOperatorContext() { return operatorContext; @@ -113,45 +127,6 @@ public class StreamSortOperator implements ProcessOperator { return inputOperator.isBlocked(); } - @Override - public TsBlock next() throws Exception { - if (!inputOperator.hasNextWithTimer()) { - if (diskSpiller.hasSpilledData()) { - try { - prepareSortReaders(); - return mergeSort(); - } catch (Exception e) { - clear(); - throw e; - } - } else { - if (curRow == -1) { - long startTime = System.nanoTime(); - cachedData.sort(comparator); - sortCost += System.nanoTime() - startTime; - curRow = 0; - } - return buildTsBlockInMemory(); - } - } - long startTime = System.nanoTime(); - try { - TsBlock tsBlock = inputOperator.nextWithTimer(); - if (tsBlock == null) { - return null; - } - dataSize += tsBlock.getRetainedSizeInBytes(); - cacheTsBlock(tsBlock); - } catch (IoTDBException e) { - clear(); - throw e; - } finally { - prepareUntilReadyCost += System.nanoTime() - startTime; - } - - return null; - } - private void recordMetrics() { operatorContext.recordSpecifiedInfo("prepareCost/ns", Long.toString(prepareUntilReadyCost)); operatorContext.recordSpecifiedInfo("sortedDataSize", Long.toString(dataSize)); @@ -185,7 +160,7 @@ public class StreamSortOperator implements ProcessOperator { noMoreData = new boolean[sortReaders.size()]; } - private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException { + protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException { long bytesSize = tsBlock.getRetainedSizeInBytes(); if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) { cachedBytes += bytesSize; @@ -211,8 +186,7 @@ public class StreamSortOperator implements ProcessOperator { diskSpiller.spillSortedData(cachedData); } - private TsBlock buildTsBlockInMemory() { - tsBlockBuilder.reset(); + private void buildTsBlockInMemory() { TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); for (int i = curRow; i < cachedData.size(); i++) { @@ -232,11 +206,9 @@ public class StreamSortOperator implements ProcessOperator { break; } } - - return tsBlockBuilder.build(); } - private TsBlock mergeSort() throws IoTDBException { + private void mergeSort() throws IoTDBException { // 1. fill the input from each reader initMergeSortHeap(); @@ -245,7 +217,6 @@ public class StreamSortOperator implements ProcessOperator { long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); // 2. do merge sort until one TsBlock is consumed up - tsBlockBuilder.reset(); TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); while (!mergeSortHeap.isEmpty()) { @@ -277,7 +248,6 @@ public class StreamSortOperator implements ProcessOperator { } } sortCost += System.nanoTime() - startTime; - return tsBlockBuilder.build(); } private void initMergeSortHeap() throws IoTDBException { @@ -329,15 +299,20 @@ public class StreamSortOperator implements ProcessOperator { sortReader.close(); } } + sortReaders = null; + diskSpiller.reset(); } catch (Exception e) { - LOGGER.error("Fail to close fileChannel", e); + LOGGER.warn("Fail to close fileChannel", e); } } @Override public boolean hasNext() throws Exception { - return inputOperator.hasNextWithTimer() - || (!diskSpiller.hasSpilledData() && curRow != cachedData.size()) + return inputOperator.hasNextWithTimer() || hasMoreSortedData(); + } + + protected boolean hasMoreSortedData() { + return (!diskSpiller.hasSpilledData() && curRow != cachedData.size()) || (diskSpiller.hasSpilledData() && hasMoreData()); } @@ -371,12 +346,16 @@ public class StreamSortOperator implements ProcessOperator { return inputOperator.calculateRetainedSizeAfterCallingNext() + SORT_BUFFER_SIZE; } - @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + RamUsageEstimator.sizeOf(noMoreData) - + tsBlockBuilder.getRetainedSizeInBytes(); + protected void resetSortRelatedResource() { + curRow = -1; + cachedData.clear(); + cachedBytes = 0; + clear(); + sortBufferManager = new SortBufferManager(); + if (!mergeSortHeap.isEmpty()) { + throw new IllegalStateException("mergeSortHeap should be empty!"); + } + mergeSortHeap = null; + noMoreData = null; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java index 55453353f0d..0f9ca2bd848 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java @@ -23,66 +23,19 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.utils.datastructure.MergeSortHeap; -import org.apache.iotdb.db.utils.datastructure.MergeSortKey; import org.apache.iotdb.db.utils.datastructure.SortKey; -import org.apache.iotdb.db.utils.sort.DiskSpiller; -import org.apache.iotdb.db.utils.sort.MemoryReader; -import org.apache.iotdb.db.utils.sort.SortBufferManager; -import org.apache.iotdb.db.utils.sort.SortReader; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.tsfile.block.column.ColumnBuilder; -import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; -import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.RamUsageEstimator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import static org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE; - -public class SortOperator implements ProcessOperator { +public class SortOperator extends AbstractSortOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(SortOperator.class); - private final OperatorContext operatorContext; - private final Operator inputOperator; - private final TsBlockBuilder tsBlockBuilder; - - // Use to output the result in memory. - // Because the memory may be larger than tsBlockBuilder's max size - // so the data may be return in multiple times. - private int curRow = -1; - - private List<SortKey> cachedData; - private final Comparator<SortKey> comparator; - private long cachedBytes; - private final DiskSpiller diskSpiller; - private final SortBufferManager sortBufferManager; - - // For mergeSort - - private MergeSortHeap mergeSortHeap; - private List<SortReader> sortReaders; - private boolean[] noMoreData; - - private static final Logger logger = LoggerFactory.getLogger(SortOperator.class); - - private final int maxReturnSize = - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); - - private long prepareUntilReadyCost = 0; - private long dataSize = 0; - private long sortCost = 0; public SortOperator( OperatorContext operatorContext, @@ -90,47 +43,16 @@ public class SortOperator implements ProcessOperator { List<TSDataType> dataTypes, String folderPath, Comparator<SortKey> comparator) { - this.operatorContext = operatorContext; - this.inputOperator = inputOperator; - this.tsBlockBuilder = new TsBlockBuilder(dataTypes); - this.cachedData = new ArrayList<>(); - this.comparator = comparator; - this.cachedBytes = 0; - this.diskSpiller = - new DiskSpiller(folderPath, folderPath + operatorContext.getOperatorId(), dataTypes); - this.sortBufferManager = new SortBufferManager(); - } - - @Override - public OperatorContext getOperatorContext() { - return operatorContext; - } - - @Override - public ListenableFuture<?> isBlocked() { - return inputOperator.isBlocked(); + super(operatorContext, inputOperator, dataTypes, folderPath, comparator); } @Override public TsBlock next() throws Exception { if (!inputOperator.hasNextWithTimer()) { - if (diskSpiller.hasSpilledData()) { - try { - prepareSortReaders(); - return mergeSort(); - } catch (Exception e) { - clear(); - throw e; - } - } else { - if (curRow == -1) { - long startTime = System.nanoTime(); - cachedData.sort(comparator); - sortCost += System.nanoTime() - startTime; - curRow = 0; - } - return buildTsBlockInMemory(); - } + buildResult(); + TsBlock res = tsBlockBuilder.build(); + tsBlockBuilder.reset(); + return res; } long startTime = System.nanoTime(); try { @@ -150,225 +72,6 @@ public class SortOperator implements ProcessOperator { return null; } - private void recordMetrics() { - operatorContext.recordSpecifiedInfo("prepareCost/ns", Long.toString(prepareUntilReadyCost)); - operatorContext.recordSpecifiedInfo("sortedDataSize", Long.toString(dataSize)); - operatorContext.recordSpecifiedInfo("sortCost/ns", Long.toString(sortCost)); - int spilledFileSize = diskSpiller.getFileSize(); - if (spilledFileSize > 0) { - operatorContext.recordSpecifiedInfo( - "merge sort branch", Integer.toString(diskSpiller.getFileSize() + 1)); - } - } - - private void prepareSortReaders() throws IoTDBException { - if (sortReaders != null) { - return; - } - sortReaders = new ArrayList<>(); - if (cachedBytes != 0) { - cachedData.sort(comparator); - if (sortBufferManager.allocate(cachedBytes)) { - sortReaders.add( - new MemoryReader( - cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList()))); - } else { - sortBufferManager.allocateOneSortBranch(); - diskSpiller.spillSortedData(cachedData); - cachedData = null; - } - } - sortReaders.addAll(diskSpiller.getReaders(sortBufferManager)); - // if reader is finished - noMoreData = new boolean[sortReaders.size()]; - } - - private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException { - long bytesSize = tsBlock.getRetainedSizeInBytes(); - if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) { - cachedBytes += bytesSize; - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - cachedData.add(new MergeSortKey(tsBlock, i)); - } - } else { - cachedData.sort(comparator); - spill(); - cachedData.clear(); - cachedBytes = bytesSize; - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - cachedData.add(new MergeSortKey(tsBlock, i)); - } - } - } - - private void spill() throws IoTDBException { - // if current memory cannot put this tsBlock, an exception will be thrown in spillSortedData() - // because there should be at least tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for - // one branch. - sortBufferManager.allocateOneSortBranch(); - diskSpiller.spillSortedData(cachedData); - } - - private TsBlock buildTsBlockInMemory() { - tsBlockBuilder.reset(); - TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); - ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); - for (int i = curRow; i < cachedData.size(); i++) { - SortKey sortKey = cachedData.get(i); - TsBlock tsBlock = sortKey.tsBlock; - timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex)); - for (int j = 0; j < valueColumnBuilders.length; j++) { - if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) { - valueColumnBuilders[j].appendNull(); - continue; - } - valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex); - } - tsBlockBuilder.declarePosition(); - curRow++; - if (tsBlockBuilder.isFull()) { - break; - } - } - - return tsBlockBuilder.build(); - } - - private TsBlock mergeSort() throws IoTDBException { - - // 1. fill the input from each reader - initMergeSortHeap(); - - long startTime = System.nanoTime(); - long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); - - // 2. do merge sort until one TsBlock is consumed up - tsBlockBuilder.reset(); - TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); - ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); - while (!mergeSortHeap.isEmpty()) { - - MergeSortKey mergeSortKey = mergeSortHeap.poll(); - TsBlock targetBlock = mergeSortKey.tsBlock; - timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex)); - for (int i = 0; i < valueColumnBuilders.length; i++) { - if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) { - valueColumnBuilders[i].appendNull(); - continue; - } - valueColumnBuilders[i].write(targetBlock.getColumn(i), mergeSortKey.rowIndex); - } - tsBlockBuilder.declarePosition(); - - int readerIndex = mergeSortKey.inputChannelIndex; - mergeSortKey = readNextMergeSortKey(readerIndex); - if (mergeSortKey != null) { - mergeSortHeap.push(mergeSortKey); - } else { - noMoreData[readerIndex] = true; - sortBufferManager.releaseOneSortBranch(); - } - - // break if time is out or tsBlockBuilder is full or sortBuffer is not enough - if (System.nanoTime() - startTime > maxRuntime || tsBlockBuilder.isFull()) { - break; - } - } - sortCost += System.nanoTime() - startTime; - return tsBlockBuilder.build(); - } - - private void initMergeSortHeap() throws IoTDBException { - if (mergeSortHeap == null) { - mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator); - for (int i = 0; i < sortReaders.size(); i++) { - SortReader sortReader = sortReaders.get(i); - if (sortReader.hasNext()) { - MergeSortKey mergeSortKey = sortReader.next(); - mergeSortKey.inputChannelIndex = i; - mergeSortHeap.push(mergeSortKey); - } else { - noMoreData[i] = true; - sortBufferManager.releaseOneSortBranch(); - } - } - } - } - - private MergeSortKey readNextMergeSortKey(int readerIndex) throws IoTDBException { - SortReader sortReader = sortReaders.get(readerIndex); - if (sortReader.hasNext()) { - MergeSortKey mergeSortKey = sortReader.next(); - mergeSortKey.inputChannelIndex = readerIndex; - return mergeSortKey; - } - return null; - } - - private boolean hasMoreData() { - if (noMoreData == null) { - return true; - } - for (boolean noMore : noMoreData) { - if (!noMore) { - return true; - } - } - return false; - } - - public void clear() { - if (!diskSpiller.hasSpilledData()) { - return; - } - try { - if (sortReaders != null) { - for (SortReader sortReader : sortReaders) { - sortReader.close(); - } - } - } catch (Exception e) { - logger.error("Fail to close fileChannel", e); - } - } - - @Override - public boolean hasNext() throws Exception { - return inputOperator.hasNextWithTimer() - || (!diskSpiller.hasSpilledData() && curRow != cachedData.size()) - || (diskSpiller.hasSpilledData() && hasMoreData()); - } - - @Override - public void close() throws Exception { - recordMetrics(); - cachedData = null; - clear(); - inputOperator.close(); - } - - @Override - public boolean isFinished() throws Exception { - return !this.hasNextWithTimer(); - } - - @Override - public long calculateMaxPeekMemory() { - return inputOperator.calculateMaxPeekMemoryWithCounter() - + inputOperator.calculateRetainedSizeAfterCallingNext() - + SORT_BUFFER_SIZE; - } - - @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return inputOperator.calculateRetainedSizeAfterCallingNext() + SORT_BUFFER_SIZE; - } - @Override public long ramBytesUsed() { return INSTANCE_SIZE diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java new file mode 100644 index 00000000000..d53a3480f93 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/StreamSortOperator.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.utils.datastructure.SortKey; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Comparator; +import java.util.List; + +public class StreamSortOperator extends AbstractSortOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(StreamSortOperator.class); + + private final Comparator<SortKey> streamSortComparator; + + private final int minLinesToOutput; + + // accumulated unprinted line count + private long remainingCount = 0; + + // child operator has no more data + private boolean noMoreDataFromChild = false; + + // unprocessed data from the previous iteration + private TsBlock currentTsBlock = null; + + // be able to start stream outputting partial data + private boolean canStreamOutput = false; + + private SortKey lastRow = null; + + public StreamSortOperator( + OperatorContext operatorContext, + Operator inputOperator, + List<TSDataType> dataTypes, + String folderPath, + Comparator<SortKey> comparator, + Comparator<SortKey> streamSortComparator, + int minLinesToOutput) { + super(operatorContext, inputOperator, dataTypes, folderPath, comparator); + this.streamSortComparator = streamSortComparator; + this.minLinesToOutput = minLinesToOutput; + } + + @Override + public TsBlock next() throws Exception { + if (canStreamOutput || !tsBlockBuilder.isEmpty()) { + + buildResult(); + + // this time stream output data has been consumed up + if (!hasMoreSortedData()) { + canStreamOutput = false; + // clear spilled disk space and memory data structure + resetSortRelatedResource(); + } + + if (tsBlockBuilder.isFull() || consumedUp()) { + TsBlock res = tsBlockBuilder.build(); + remainingCount -= res.getPositionCount(); + tsBlockBuilder.reset(); + return res; + } + } + + if (currentTsBlock != null) { + cacheTsBlock(currentTsBlock); + currentTsBlock = null; + } + + long startTime = System.nanoTime(); + if (!inputOperator.hasNextWithTimer()) { + noMoreDataFromChild = true; + canStreamOutput = true; + } else { + try { + // init currentTsBlock from child operator + currentTsBlock = inputOperator.nextWithTimer(); + if (currentTsBlock == null || currentTsBlock.isEmpty()) { + currentTsBlock = null; + return null; + } + // record total sorted data size + dataSize += currentTsBlock.getRetainedSizeInBytes(); + + // if currentTsBlock line count + remainingCount is still less than minLinesToOutput, just + // cache it + if (currentTsBlock.getPositionCount() + remainingCount < minLinesToOutput) { + cacheTsBlock(currentTsBlock); + remainingCount += currentTsBlock.getPositionCount(); + currentTsBlock = null; + } else { + // if stream compare key of the last row of currentTsBlock is same as the last row of + // previous TsBlock, we cannot stream output, just cache it + if (isStreamCompareKeySame()) { + cacheTsBlock(currentTsBlock); + remainingCount += currentTsBlock.getPositionCount(); + currentTsBlock = null; + } else { + // traverse the current TsBlock backward until encountering a row with a StreamCompKey + // different from the last row, and return the index of that row + int endIndex = getEndIndexFromCurrentTsBlock(); + // if total count of `canOutput` lines is less than minLinesToOutput, we won't output + // them, we just cache the whole currentTsBlock + if (endIndex == -1 || endIndex + remainingCount + 1 < minLinesToOutput) { + cacheTsBlock(currentTsBlock); + remainingCount += currentTsBlock.getPositionCount(); + currentTsBlock = null; + } else { + // `canOutput` lines count is larger than minLinesToOutput, so we can stream output + cacheTsBlock(currentTsBlock.getRegion(0, endIndex + 1)); + remainingCount += currentTsBlock.getPositionCount(); + canStreamOutput = true; + currentTsBlock = currentTsBlock.subTsBlock(endIndex + 1); + } + } + } + } catch (IoTDBException e) { + clear(); + throw e; + } finally { + prepareUntilReadyCost += System.nanoTime() - startTime; + } + } + return null; + } + + private boolean consumedUp() { + return remainingCount == tsBlockBuilder.getPositionCount() && noMoreDataFromChild; + } + + // return -1, if not found + private int getEndIndexFromCurrentTsBlock() { + SortKey sortKeyOfLastRow = new SortKey(currentTsBlock, currentTsBlock.getPositionCount() - 1); + SortKey sortKeyOfFoundRow = new SortKey(currentTsBlock, currentTsBlock.getPositionCount() - 2); + for (int index = currentTsBlock.getPositionCount() - 2; index >= 0; index--) { + sortKeyOfFoundRow.rowIndex = index; + if (streamSortComparator.compare(sortKeyOfFoundRow, sortKeyOfLastRow) != 0) { + return index; + } + } + return -1; + } + + private boolean isStreamCompareKeySame() { + return lastRow == null + || streamSortComparator.compare( + lastRow, new SortKey(currentTsBlock, currentTsBlock.getPositionCount() - 1)) + == 0; + } + + protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException { + super.cacheTsBlock(tsBlock); + lastRow = new SortKey(tsBlock, tsBlock.getPositionCount() - 1); + } + + @Override + public boolean hasNext() throws Exception { + return super.hasNext() || !tsBlockBuilder.isEmpty() || currentTsBlock != null; + } + + @Override + public void close() throws Exception { + super.close(); + lastRow = null; + currentTsBlock = null; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + RamUsageEstimator.sizeOf(noMoreData) + + tsBlockBuilder.getRetainedSizeInBytes(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 73b333a8c86..d7d62ec2e86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator; import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; @@ -63,6 +64,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Expression; @@ -240,6 +242,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(context); scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); + scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); Expression pushDownPredicate = node.getPushDownPredicate(); @@ -711,4 +714,54 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } }); } + + @Override + public Operator visitStreamSort(StreamSortNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + StreamSortNode.class.getSimpleName()); + List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + int sortItemsCount = node.getOrderingScheme().getOrderBy().size(); + + List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount); + List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount); + genSortInformation( + node.getOutputSymbols(), + node.getOrderingScheme(), + sortItemIndexList, + sortItemDataTypeList, + context.getTypeProvider()); + + String filePrefix = + IoTDBDescriptor.getInstance().getConfig().getSortTmpDir() + + File.separator + + operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId() + + File.separator + + operatorContext.getDriverContext().getPipelineId() + + File.separator; + + context.getDriverContext().setHaveTmpFile(true); + context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true); + + Operator child = node.getChild().accept(this, context); + + return new StreamSortOperator( + operatorContext, + child, + dataTypes, + filePrefix, + getComparatorForTable( + node.getOrderingScheme().getOrderingList(), sortItemIndexList, sortItemDataTypeList), + getComparatorForTable( + node.getOrderingScheme() + .getOrderingList() + .subList(0, node.getStreamCompareKeyEndIndex() + 1), + sortItemIndexList.subList(0, node.getStreamCompareKeyEndIndex() + 1), + sortItemDataTypeList.subList(0, node.getStreamCompareKeyEndIndex() + 1)), + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java index c39e4f6ea18..1bd6892541c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java @@ -37,24 +37,29 @@ public class SeriesScanOptions { private Filter globalTimeFilter; - private Filter pushDownFilter; + private final Filter pushDownFilter; private final long pushDownLimit; private final long pushDownOffset; private final Set<String> allSensors; + private final boolean pushLimitToEachDevice; + private PaginationController paginationController; + public SeriesScanOptions( Filter globalTimeFilter, Filter pushDownFilter, long pushDownLimit, long pushDownOffset, - Set<String> allSensors) { + Set<String> allSensors, + boolean pushLimitToEachDevice) { this.globalTimeFilter = globalTimeFilter; this.pushDownFilter = pushDownFilter; this.pushDownLimit = pushDownLimit; this.pushDownOffset = pushDownOffset; this.allSensors = allSensors; + this.pushLimitToEachDevice = pushLimitToEachDevice; } public static SeriesScanOptions getDefaultSeriesScanOptions(IFullPath seriesPath) { @@ -82,7 +87,14 @@ public class SeriesScanOptions { } public PaginationController getPaginationController() { - return new PaginationController(pushDownLimit, pushDownOffset); + if (pushLimitToEachDevice) { + return new PaginationController(pushDownLimit, pushDownOffset); + } else { + if (paginationController == null) { + paginationController = new PaginationController(pushDownLimit, pushDownOffset); + } + return paginationController; + } } public void setTTL(long dataTTL) { @@ -114,6 +126,8 @@ public class SeriesScanOptions { private Set<String> allSensors; + private boolean pushLimitToEachDevice = true; + public Builder withGlobalTimeFilter(Filter globalTimeFilter) { this.globalTimeFilter = globalTimeFilter; return this; @@ -134,13 +148,23 @@ public class SeriesScanOptions { return this; } + public Builder withPushLimitToEachDevice(boolean pushLimitToEachDevice) { + this.pushLimitToEachDevice = pushLimitToEachDevice; + return this; + } + public void withAllSensors(Set<String> allSensors) { this.allSensors = allSensors; } public SeriesScanOptions build() { return new SeriesScanOptions( - globalTimeFilter, pushDownFilter, pushDownLimit, pushDownOffset, allSensors); + globalTimeFilter, + pushDownFilter, + pushDownLimit, + pushDownOffset, + allSensors, + pushLimitToEachDevice); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java index c27b176bd9f..9a52f7464f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/DiskSpiller.java @@ -168,4 +168,8 @@ public class DiskSpiller { public int getFileSize() { return fileIndex; } + + public void reset() { + fileIndex = 0; + } }
