This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/optimize_order_by in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit faa302fb2d61376bb3b7baff32cbcc02d8da60b8 Author: Beyyes <[email protected]> AuthorDate: Tue Oct 17 23:32:20 2023 +0800 perfect TopKOperator impl --- .../execution/operator/process/TopKOperator.java | 132 +++++++++++++-------- 1 file changed, 80 insertions(+), 52 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java index 40f271e2392..692a2ea53c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java @@ -42,15 +42,21 @@ import org.apache.iotdb.tsfile.utils.Binary; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Optional; +import static com.google.common.util.concurrent.Futures.successfulAsList; + public class TopKOperator implements ProcessOperator { private final OperatorContext operatorContext; private final List<Operator> deviceOperators; private int deviceIndex; + // read step operators each invoking + private int deviceBatchStep; + private boolean[] canCallNext; private final List<TSDataType> dataTypes; private final TsBlockBuilder tsBlockBuilder; @@ -88,7 +94,14 @@ public class TopKOperator implements ProcessOperator { this.tsBlockBuilder = new TsBlockBuilder(topValue, dataTypes); this.topValue = topValue; this.childrenDataInOrder = childrenDataInOrder; + initResultTsBlock(); + + // activate each FI, to improve FI parallelism + deviceOperators.forEach(Operator::isBlocked); + + deviceBatchStep = 10000 % topValue == 0 ? 10000 / topValue : 10000 / topValue + 1; + canCallNext = new boolean[deviceOperators.size()]; } @Override @@ -98,14 +111,25 @@ public class TopKOperator implements ProcessOperator { @Override public ListenableFuture<?> isBlocked() { - if (deviceIndex >= deviceOperators.size()) { - return NOT_BLOCKED; - } - ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked(); - if (!blocked.isDone()) { - return blocked; + boolean hasReadyChild = false; + List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); + for (int i = deviceIndex; + i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size()); + i++) { + if (getOperator(i) == null) { + continue; + } + ListenableFuture<?> blocked = getOperator(i).isBlocked(); + if (blocked.isDone()) { + hasReadyChild = true; + canCallNext[i] = true; + } else { + listenableFutures.add(blocked); + } } - return NOT_BLOCKED; + return (hasReadyChild || listenableFutures.isEmpty()) + ? NOT_BLOCKED + : successfulAsList(listenableFutures); } @Override @@ -124,42 +148,51 @@ public class TopKOperator implements ProcessOperator { return getResultFromCachedTopKResult(); } - if (!getCurDeviceOperator().hasNextWithTimer()) { - closeCurDeviceOperator(); - if (deviceIndex == deviceOperators.size()) { - return getResultFromMaxHeap(mergeSortHeap); + boolean batchFinished = true; + int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, deviceOperators.size()); + for (int i = deviceIndex; i < operatorBatchEnd; i++) { + if (getOperator(i) == null || !canCallNext[i]) { + continue; } - return null; - } - TsBlock currentTsBlock = getCurDeviceOperator().nextWithTimer(); - if (currentTsBlock == null) { - return null; - } + if (!getOperator(i).hasNextWithTimer()) { + closeOperator(i); + continue; + } - boolean skipCurrentBatch = false; - for (int idx = 0; idx < currentTsBlock.getPositionCount(); idx++) { - if (mergeSortHeap.getHeapSize() < topValue) { - updateTsBlockValue(currentTsBlock, idx, -1); - } else { - if (comparator.compare(new MergeSortKey(currentTsBlock, idx), mergeSortHeap.peek()) < 0) { - MergeSortKey peek = mergeSortHeap.poll(); - updateTsBlockValue(currentTsBlock, idx, peek.rowIndex); - } else if (childrenDataInOrder) { - skipCurrentBatch = true; - break; + batchFinished = false; + TsBlock currentTsBlock = getOperator(i).nextWithTimer(); + if (currentTsBlock == null || currentTsBlock.isEmpty()) { + continue; + } + + boolean skipCurrentBatch = false; + for (int vIdx = 0; vIdx < currentTsBlock.getPositionCount(); vIdx++) { + if (mergeSortHeap.getHeapSize() < topValue) { + updateTsBlockValue(currentTsBlock, vIdx, -1); + } else { + if (comparator.compare(new MergeSortKey(currentTsBlock, vIdx), mergeSortHeap.peek()) + < 0) { + MergeSortKey peek = mergeSortHeap.poll(); + updateTsBlockValue(currentTsBlock, vIdx, peek.rowIndex); + } else if (childrenDataInOrder) { + skipCurrentBatch = true; + break; + } } } + // if current childIdx TsBlock has no value to put into heap + // the remaining data will also have no value to put int heap + if (skipCurrentBatch) { + closeOperator(i); + } } - // if current childIdx TsBlock has no value to put into heap - // the remaining data will also have no value to put int heap - if (skipCurrentBatch) { - closeCurDeviceOperator(); - if (deviceIndex == deviceOperators.size()) { - return getResultFromMaxHeap(mergeSortHeap); + if (batchFinished) { + deviceIndex = deviceIndex + deviceBatchStep; + if (deviceIndex >= deviceOperators.size()) { + return getResultFromCachedTopKResult(); } - return null; } return null; @@ -235,17 +268,15 @@ public class TopKOperator implements ProcessOperator { new TsBlock(positionCount, new TimeColumn(positionCount, new long[positionCount]), columns); } - private TsBlock getResultFromMaxHeap(MergeSortHeap mergeSortHeap) { - int cnt = mergeSortHeap.getHeapSize(); - topKResult = new MergeSortKey[cnt]; - while (!mergeSortHeap.isEmpty()) { - topKResult[--cnt] = mergeSortHeap.poll(); + private TsBlock getResultFromCachedTopKResult() { + if (mergeSortHeap.getHeapSize() > 0) { + int cnt = mergeSortHeap.getHeapSize(); + topKResult = new MergeSortKey[cnt]; + while (!mergeSortHeap.isEmpty()) { + topKResult[--cnt] = mergeSortHeap.poll(); + } } - return getResultFromCachedTopKResult(); - } - - private TsBlock getResultFromCachedTopKResult() { tsBlockBuilder.reset(); ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); for (int i = resultReturnSize; i < topKResult.length; i++) { @@ -309,15 +340,12 @@ public class TopKOperator implements ProcessOperator { mergeSortHeap.push(new MergeSortKey(tmpResultTsBlock, peekIndex)); } - private Operator getCurDeviceOperator() { - return deviceOperators.get(deviceIndex); + private Operator getOperator(int i) { + return deviceOperators.get(i); } - private void closeCurDeviceOperator() throws Exception { - // close finished child - getCurDeviceOperator().close(); - deviceOperators.set(deviceIndex, null); - // increment index, move to next child - deviceIndex++; + private void closeOperator(int i) throws Exception { + getOperator(i).close(); + deviceOperators.set(i, null); } }
