This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/optimize_order_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit faa302fb2d61376bb3b7baff32cbcc02d8da60b8
Author: Beyyes <[email protected]>
AuthorDate: Tue Oct 17 23:32:20 2023 +0800

    perfect TopKOperator impl
---
 .../execution/operator/process/TopKOperator.java   | 132 +++++++++++++--------
 1 file changed, 80 insertions(+), 52 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index 40f271e2392..692a2ea53c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -42,15 +42,21 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public class TopKOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
 
   private final List<Operator> deviceOperators;
   private int deviceIndex;
+  // read step operators each invoking
+  private int deviceBatchStep;
+  private boolean[] canCallNext;
 
   private final List<TSDataType> dataTypes;
   private final TsBlockBuilder tsBlockBuilder;
@@ -88,7 +94,14 @@ public class TopKOperator implements ProcessOperator {
     this.tsBlockBuilder = new TsBlockBuilder(topValue, dataTypes);
     this.topValue = topValue;
     this.childrenDataInOrder = childrenDataInOrder;
+
     initResultTsBlock();
+
+    // activate each FI, to improve FI parallelism
+    deviceOperators.forEach(Operator::isBlocked);
+
+    deviceBatchStep = 10000 % topValue == 0 ? 10000 / topValue : 10000 / 
topValue + 1;
+    canCallNext = new boolean[deviceOperators.size()];
   }
 
   @Override
@@ -98,14 +111,25 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if (deviceIndex >= deviceOperators.size()) {
-      return NOT_BLOCKED;
-    }
-    ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
+    boolean hasReadyChild = false;
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = deviceIndex;
+        i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
+        i++) {
+      if (getOperator(i) == null) {
+        continue;
+      }
+      ListenableFuture<?> blocked = getOperator(i).isBlocked();
+      if (blocked.isDone()) {
+        hasReadyChild = true;
+        canCallNext[i] = true;
+      } else {
+        listenableFutures.add(blocked);
+      }
     }
-    return NOT_BLOCKED;
+    return (hasReadyChild || listenableFutures.isEmpty())
+        ? NOT_BLOCKED
+        : successfulAsList(listenableFutures);
   }
 
   @Override
@@ -124,42 +148,51 @@ public class TopKOperator implements ProcessOperator {
       return getResultFromCachedTopKResult();
     }
 
-    if (!getCurDeviceOperator().hasNextWithTimer()) {
-      closeCurDeviceOperator();
-      if (deviceIndex == deviceOperators.size()) {
-        return getResultFromMaxHeap(mergeSortHeap);
+    boolean batchFinished = true;
+    int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, 
deviceOperators.size());
+    for (int i = deviceIndex; i < operatorBatchEnd; i++) {
+      if (getOperator(i) == null || !canCallNext[i]) {
+        continue;
       }
-      return null;
-    }
 
-    TsBlock currentTsBlock = getCurDeviceOperator().nextWithTimer();
-    if (currentTsBlock == null) {
-      return null;
-    }
+      if (!getOperator(i).hasNextWithTimer()) {
+        closeOperator(i);
+        continue;
+      }
 
-    boolean skipCurrentBatch = false;
-    for (int idx = 0; idx < currentTsBlock.getPositionCount(); idx++) {
-      if (mergeSortHeap.getHeapSize() < topValue) {
-        updateTsBlockValue(currentTsBlock, idx, -1);
-      } else {
-        if (comparator.compare(new MergeSortKey(currentTsBlock, idx), 
mergeSortHeap.peek()) < 0) {
-          MergeSortKey peek = mergeSortHeap.poll();
-          updateTsBlockValue(currentTsBlock, idx, peek.rowIndex);
-        } else if (childrenDataInOrder) {
-          skipCurrentBatch = true;
-          break;
+      batchFinished = false;
+      TsBlock currentTsBlock = getOperator(i).nextWithTimer();
+      if (currentTsBlock == null || currentTsBlock.isEmpty()) {
+        continue;
+      }
+
+      boolean skipCurrentBatch = false;
+      for (int vIdx = 0; vIdx < currentTsBlock.getPositionCount(); vIdx++) {
+        if (mergeSortHeap.getHeapSize() < topValue) {
+          updateTsBlockValue(currentTsBlock, vIdx, -1);
+        } else {
+          if (comparator.compare(new MergeSortKey(currentTsBlock, vIdx), 
mergeSortHeap.peek())
+              < 0) {
+            MergeSortKey peek = mergeSortHeap.poll();
+            updateTsBlockValue(currentTsBlock, vIdx, peek.rowIndex);
+          } else if (childrenDataInOrder) {
+            skipCurrentBatch = true;
+            break;
+          }
         }
       }
+      // if current childIdx TsBlock has no value to put into heap
+      // the remaining data will also have no value to put int heap
+      if (skipCurrentBatch) {
+        closeOperator(i);
+      }
     }
 
-    // if current childIdx TsBlock has no value to put into heap
-    // the remaining data will also have no value to put int heap
-    if (skipCurrentBatch) {
-      closeCurDeviceOperator();
-      if (deviceIndex == deviceOperators.size()) {
-        return getResultFromMaxHeap(mergeSortHeap);
+    if (batchFinished) {
+      deviceIndex = deviceIndex + deviceBatchStep;
+      if (deviceIndex >= deviceOperators.size()) {
+        return getResultFromCachedTopKResult();
       }
-      return null;
     }
 
     return null;
@@ -235,17 +268,15 @@ public class TopKOperator implements ProcessOperator {
         new TsBlock(positionCount, new TimeColumn(positionCount, new 
long[positionCount]), columns);
   }
 
-  private TsBlock getResultFromMaxHeap(MergeSortHeap mergeSortHeap) {
-    int cnt = mergeSortHeap.getHeapSize();
-    topKResult = new MergeSortKey[cnt];
-    while (!mergeSortHeap.isEmpty()) {
-      topKResult[--cnt] = mergeSortHeap.poll();
+  private TsBlock getResultFromCachedTopKResult() {
+    if (mergeSortHeap.getHeapSize() > 0) {
+      int cnt = mergeSortHeap.getHeapSize();
+      topKResult = new MergeSortKey[cnt];
+      while (!mergeSortHeap.isEmpty()) {
+        topKResult[--cnt] = mergeSortHeap.poll();
+      }
     }
 
-    return getResultFromCachedTopKResult();
-  }
-
-  private TsBlock getResultFromCachedTopKResult() {
     tsBlockBuilder.reset();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
     for (int i = resultReturnSize; i < topKResult.length; i++) {
@@ -309,15 +340,12 @@ public class TopKOperator implements ProcessOperator {
     mergeSortHeap.push(new MergeSortKey(tmpResultTsBlock, peekIndex));
   }
 
-  private Operator getCurDeviceOperator() {
-    return deviceOperators.get(deviceIndex);
+  private Operator getOperator(int i) {
+    return deviceOperators.get(i);
   }
 
-  private void closeCurDeviceOperator() throws Exception {
-    // close finished child
-    getCurDeviceOperator().close();
-    deviceOperators.set(deviceIndex, null);
-    // increment index, move to next child
-    deviceIndex++;
+  private void closeOperator(int i) throws Exception {
+    getOperator(i).close();
+    deviceOperators.set(i, null);
   }
 }

Reply via email to