This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 82303b66e11 Add batch operators loading in the next method of 
TopKOperator
82303b66e11 is described below

commit 82303b66e118392c509d9f9c83cccfba7a7f90bf
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 19 10:33:46 2023 +0800

    Add batch operators loading in the next method of TopKOperator
---
 .../execution/operator/process/TopKOperator.java   | 145 +++++++++++++--------
 .../plan/planner/OperatorTreeGenerator.java        |   2 +-
 .../queryengine/plan/plan/PipelineBuilderTest.java |  91 +++++++++++++
 3 files changed, 184 insertions(+), 54 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index 40f271e2392..6cb7f6b64b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -42,15 +42,22 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
 
 public class TopKOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
 
   private final List<Operator> deviceOperators;
   private int deviceIndex;
+  // read step operators each invoking
+  private int deviceBatchStep;
+  private boolean[] canCallNext;
 
   private final List<TSDataType> dataTypes;
   private final TsBlockBuilder tsBlockBuilder;
@@ -88,7 +95,11 @@ public class TopKOperator implements ProcessOperator {
     this.tsBlockBuilder = new TsBlockBuilder(topValue, dataTypes);
     this.topValue = topValue;
     this.childrenDataInOrder = childrenDataInOrder;
+
     initResultTsBlock();
+
+    deviceBatchStep = 10000 % topValue == 0 ? 10000 / topValue : 10000 / 
topValue + 1;
+    canCallNext = new boolean[deviceOperators.size()];
   }
 
   @Override
@@ -98,14 +109,25 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if (deviceIndex >= deviceOperators.size()) {
-      return NOT_BLOCKED;
-    }
-    ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
+    boolean hasReadyChild = false;
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = deviceIndex;
+        i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
+        i++) {
+      if (getOperator(i) == null) {
+        continue;
+      }
+      ListenableFuture<?> blocked = getOperator(i).isBlocked();
+      if (blocked.isDone()) {
+        hasReadyChild = true;
+        canCallNext[i] = true;
+      } else {
+        listenableFutures.add(blocked);
+      }
     }
-    return NOT_BLOCKED;
+    return (hasReadyChild || listenableFutures.isEmpty())
+        ? NOT_BLOCKED
+        : successfulAsList(listenableFutures);
   }
 
   @Override
@@ -115,7 +137,7 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    return !(deviceIndex >= deviceOperators.size() && resultReturnSize == 
topKResult.length);
+    return !(deviceIndex >= deviceOperators.size() && resultReturnSize >= 
topKResult.length);
   }
 
   @Override
@@ -124,42 +146,64 @@ public class TopKOperator implements ProcessOperator {
       return getResultFromCachedTopKResult();
     }
 
-    if (!getCurDeviceOperator().hasNextWithTimer()) {
-      closeCurDeviceOperator();
-      if (deviceIndex == deviceOperators.size()) {
-        return getResultFromMaxHeap(mergeSortHeap);
+    long startTime = System.nanoTime();
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+    boolean batchFinished = true;
+    int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, 
deviceOperators.size());
+    for (int i = deviceIndex; i < operatorBatchEnd; i++) {
+      if (getOperator(i) == null) {
+        continue;
       }
-      return null;
-    }
 
-    TsBlock currentTsBlock = getCurDeviceOperator().nextWithTimer();
-    if (currentTsBlock == null) {
-      return null;
-    }
+      if (!canCallNext[i]) {
+        batchFinished = false;
+        continue;
+      }
 
-    boolean skipCurrentBatch = false;
-    for (int idx = 0; idx < currentTsBlock.getPositionCount(); idx++) {
-      if (mergeSortHeap.getHeapSize() < topValue) {
-        updateTsBlockValue(currentTsBlock, idx, -1);
-      } else {
-        if (comparator.compare(new MergeSortKey(currentTsBlock, idx), 
mergeSortHeap.peek()) < 0) {
-          MergeSortKey peek = mergeSortHeap.poll();
-          updateTsBlockValue(currentTsBlock, idx, peek.rowIndex);
-        } else if (childrenDataInOrder) {
-          skipCurrentBatch = true;
-          break;
+      if (!getOperator(i).hasNextWithTimer()) {
+        closeOperator(i);
+        continue;
+      }
+
+      batchFinished = false;
+      TsBlock currentTsBlock = getOperator(i).nextWithTimer();
+      if (currentTsBlock == null || currentTsBlock.isEmpty()) {
+        continue;
+      }
+
+      boolean skipCurrentBatch = false;
+      for (int vIdx = 0; vIdx < currentTsBlock.getPositionCount(); vIdx++) {
+        if (mergeSortHeap.getHeapSize() < topValue) {
+          updateTsBlockValue(currentTsBlock, vIdx, -1);
+        } else {
+          if (comparator.compare(new MergeSortKey(currentTsBlock, vIdx), 
mergeSortHeap.peek())
+              < 0) {
+            MergeSortKey peek = mergeSortHeap.poll();
+            updateTsBlockValue(currentTsBlock, vIdx, peek.rowIndex);
+          } else if (childrenDataInOrder) {
+            skipCurrentBatch = true;
+            break;
+          }
         }
       }
+      // if current childIdx TsBlock has no value to put into heap
+      // the remaining data will also have no value to put int heap
+      if (skipCurrentBatch) {
+        closeOperator(i);
+      }
+      canCallNext[i] = false;
+
+      if (System.nanoTime() - startTime > maxRuntime) {
+        break;
+      }
     }
 
-    // if current childIdx TsBlock has no value to put into heap
-    // the remaining data will also have no value to put int heap
-    if (skipCurrentBatch) {
-      closeCurDeviceOperator();
-      if (deviceIndex == deviceOperators.size()) {
-        return getResultFromMaxHeap(mergeSortHeap);
+    if (batchFinished) {
+      deviceIndex = deviceIndex + deviceBatchStep;
+      if (deviceIndex >= deviceOperators.size()) {
+        return getResultFromCachedTopKResult();
       }
-      return null;
     }
 
     return null;
@@ -235,17 +279,15 @@ public class TopKOperator implements ProcessOperator {
         new TsBlock(positionCount, new TimeColumn(positionCount, new 
long[positionCount]), columns);
   }
 
-  private TsBlock getResultFromMaxHeap(MergeSortHeap mergeSortHeap) {
-    int cnt = mergeSortHeap.getHeapSize();
-    topKResult = new MergeSortKey[cnt];
-    while (!mergeSortHeap.isEmpty()) {
-      topKResult[--cnt] = mergeSortHeap.poll();
+  private TsBlock getResultFromCachedTopKResult() {
+    if (mergeSortHeap.getHeapSize() > 0) {
+      int cnt = mergeSortHeap.getHeapSize();
+      topKResult = new MergeSortKey[cnt];
+      while (!mergeSortHeap.isEmpty()) {
+        topKResult[--cnt] = mergeSortHeap.poll();
+      }
     }
 
-    return getResultFromCachedTopKResult();
-  }
-
-  private TsBlock getResultFromCachedTopKResult() {
     tsBlockBuilder.reset();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
     for (int i = resultReturnSize; i < topKResult.length; i++) {
@@ -309,15 +351,12 @@ public class TopKOperator implements ProcessOperator {
     mergeSortHeap.push(new MergeSortKey(tmpResultTsBlock, peekIndex));
   }
 
-  private Operator getCurDeviceOperator() {
-    return deviceOperators.get(deviceIndex);
+  private Operator getOperator(int i) {
+    return deviceOperators.get(i);
   }
 
-  private void closeCurDeviceOperator() throws Exception {
-    // close finished child
-    getCurDeviceOperator().close();
-    deviceOperators.set(deviceIndex, null);
-    // increment index, move to next child
-    deviceIndex++;
+  private void closeOperator(int i) throws Exception {
+    getOperator(i).close();
+    deviceOperators.set(i, null);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 6e086fac7ce..e0be33d076f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -856,7 +856,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 TopKOperator.class.getSimpleName());
     List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
     context.setCachedDataTypes(dataTypes);
-    List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, 
context);
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
     List<SortItem> sortItemList = 
node.getMergeOrderParameter().getSortItemList();
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
index 6ac9ade2501..d07ea63d43c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -45,13 +46,18 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -59,11 +65,13 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey.DEVICE;
 import static org.junit.Assert.assertEquals;
 
 public class PipelineBuilderTest {
@@ -376,6 +384,56 @@ public class PipelineBuilderTest {
     assertEquals(4, context.getExchangeSumNum());
   }
 
+  /**
+   * This test will test dop = 3. Expected result is three pipelines:
+   *
+   * <p>The first is: TopKNode1 - [SingleDeviceViewNode0, ExchangeOperator, 
ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SingleDeviceViewNode1.
+   *
+   * <p>The third is: ExchangeOperator - TopKNode1-1[SingleDeviceViewNode2, 
SingleDeviceViewNode3].
+   */
+  @Test
+  public void testTopKConsumeAllChildrenPipelineBuilder3() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TopKNode topKNode = initTopKNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(3);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(topKNode, 
context);
+    // The number of pipeline is 2, since parent pipeline hasn't joined
+    assertEquals(2, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(3, childrenOperator.size());
+    assertEquals(SingleDeviceViewOperator.class, 
childrenOperator.get(0).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+    // Validate the changes of node structure
+    assertEquals(3, topKNode.getChildren().size());
+    assertEquals("Time", 
topKNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals("Time", 
topKNode.getChildren().get(1).getOutputColumnNames().get(0));
+    assertEquals("Time", 
topKNode.getChildren().get(2).getOutputColumnNames().get(0));
+    assertEquals(TopKNode.class, topKNode.getChildren().get(2).getClass());
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("SingleDeviceViewNode1", 
exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    TopKNode subTimeJoinNode = (TopKNode) topKNode.getChildren().get(2);
+    assertEquals(2, subTimeJoinNode.getChildren().size());
+    assertEquals("Time", 
subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals("Time", 
subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals(exchangeOperator2.getSourceId(), 
subTimeJoinNode.getPlanNodeId());
+
+    // Validate the number exchange operator
+    assertEquals(2, context.getExchangeSumNum());
+  }
+
   /**
    * The operator structure is [DeviceView - 
[SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
    *
@@ -855,4 +913,37 @@ public class PipelineBuilderTest {
     }
     return deviceViewNode;
   }
+
+  private TopKNode initTopKNode(TypeProvider typeProvider, int childNum)
+      throws IllegalPathException {
+    TopKNode topKNode =
+        new TopKNode(
+            new PlanNodeId("TopKNode"),
+            10,
+            new OrderByParameter(
+                Arrays.asList(
+                    new SortItem(OrderByKey.TIME, Ordering.ASC),
+                    new SortItem(DEVICE, Ordering.ASC))),
+            Arrays.asList("Time", "Device", "s1"));
+    for (int i = 0; i < childNum; i++) {
+      SingleDeviceViewNode singleDeviceViewNode =
+          new SingleDeviceViewNode(
+              new PlanNodeId(String.format("SingleDeviceViewNode%d", i)),
+              Arrays.asList("Time", "Device", "s1"),
+              "root.sg.d" + i,
+              Arrays.asList(0, 1, 2));
+      singleDeviceViewNode.setCacheOutputColumnNames(true);
+      SeriesScanNode seriesScanNode =
+          new SeriesScanNode(
+              new PlanNodeId(String.format("SeriesScanNode%d", i)),
+              new MeasurementPath(String.format("root.sg.d%d.s1", i), 
TSDataType.INT32));
+      typeProvider.setType(seriesScanNode.getSeriesPath().toString(), 
TSDataType.INT32);
+      singleDeviceViewNode.addChild(seriesScanNode);
+      typeProvider.setType("Time", TSDataType.INT64);
+      typeProvider.setType("Device", TSDataType.TEXT);
+      typeProvider.setType("s1", TSDataType.INT32);
+      topKNode.addChild(singleDeviceViewNode);
+    }
+    return topKNode;
+  }
 }

Reply via email to