This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 82303b66e11 Add batch operators loading in the next method of
TopKOperator
82303b66e11 is described below
commit 82303b66e118392c509d9f9c83cccfba7a7f90bf
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 19 10:33:46 2023 +0800
Add batch operators loading in the next method of TopKOperator
---
.../execution/operator/process/TopKOperator.java | 145 +++++++++++++--------
.../plan/planner/OperatorTreeGenerator.java | 2 +-
.../queryengine/plan/plan/PipelineBuilderTest.java | 91 +++++++++++++
3 files changed, 184 insertions(+), 54 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index 40f271e2392..6cb7f6b64b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -42,15 +42,22 @@ import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
public class TopKOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private final List<Operator> deviceOperators;
private int deviceIndex;
+ // read step operators each invoking
+ private int deviceBatchStep;
+ private boolean[] canCallNext;
private final List<TSDataType> dataTypes;
private final TsBlockBuilder tsBlockBuilder;
@@ -88,7 +95,11 @@ public class TopKOperator implements ProcessOperator {
this.tsBlockBuilder = new TsBlockBuilder(topValue, dataTypes);
this.topValue = topValue;
this.childrenDataInOrder = childrenDataInOrder;
+
initResultTsBlock();
+
+ deviceBatchStep = 10000 % topValue == 0 ? 10000 / topValue : 10000 /
topValue + 1;
+ canCallNext = new boolean[deviceOperators.size()];
}
@Override
@@ -98,14 +109,25 @@ public class TopKOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
- if (deviceIndex >= deviceOperators.size()) {
- return NOT_BLOCKED;
- }
- ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
- if (!blocked.isDone()) {
- return blocked;
+ boolean hasReadyChild = false;
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+ for (int i = deviceIndex;
+ i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
+ i++) {
+ if (getOperator(i) == null) {
+ continue;
+ }
+ ListenableFuture<?> blocked = getOperator(i).isBlocked();
+ if (blocked.isDone()) {
+ hasReadyChild = true;
+ canCallNext[i] = true;
+ } else {
+ listenableFutures.add(blocked);
+ }
}
- return NOT_BLOCKED;
+ return (hasReadyChild || listenableFutures.isEmpty())
+ ? NOT_BLOCKED
+ : successfulAsList(listenableFutures);
}
@Override
@@ -115,7 +137,7 @@ public class TopKOperator implements ProcessOperator {
@Override
public boolean hasNext() throws Exception {
- return !(deviceIndex >= deviceOperators.size() && resultReturnSize ==
topKResult.length);
+ return !(deviceIndex >= deviceOperators.size() && resultReturnSize >=
topKResult.length);
}
@Override
@@ -124,42 +146,64 @@ public class TopKOperator implements ProcessOperator {
return getResultFromCachedTopKResult();
}
- if (!getCurDeviceOperator().hasNextWithTimer()) {
- closeCurDeviceOperator();
- if (deviceIndex == deviceOperators.size()) {
- return getResultFromMaxHeap(mergeSortHeap);
+ long startTime = System.nanoTime();
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+ boolean batchFinished = true;
+ int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep,
deviceOperators.size());
+ for (int i = deviceIndex; i < operatorBatchEnd; i++) {
+ if (getOperator(i) == null) {
+ continue;
}
- return null;
- }
- TsBlock currentTsBlock = getCurDeviceOperator().nextWithTimer();
- if (currentTsBlock == null) {
- return null;
- }
+ if (!canCallNext[i]) {
+ batchFinished = false;
+ continue;
+ }
- boolean skipCurrentBatch = false;
- for (int idx = 0; idx < currentTsBlock.getPositionCount(); idx++) {
- if (mergeSortHeap.getHeapSize() < topValue) {
- updateTsBlockValue(currentTsBlock, idx, -1);
- } else {
- if (comparator.compare(new MergeSortKey(currentTsBlock, idx),
mergeSortHeap.peek()) < 0) {
- MergeSortKey peek = mergeSortHeap.poll();
- updateTsBlockValue(currentTsBlock, idx, peek.rowIndex);
- } else if (childrenDataInOrder) {
- skipCurrentBatch = true;
- break;
+ if (!getOperator(i).hasNextWithTimer()) {
+ closeOperator(i);
+ continue;
+ }
+
+ batchFinished = false;
+ TsBlock currentTsBlock = getOperator(i).nextWithTimer();
+ if (currentTsBlock == null || currentTsBlock.isEmpty()) {
+ continue;
+ }
+
+ boolean skipCurrentBatch = false;
+ for (int vIdx = 0; vIdx < currentTsBlock.getPositionCount(); vIdx++) {
+ if (mergeSortHeap.getHeapSize() < topValue) {
+ updateTsBlockValue(currentTsBlock, vIdx, -1);
+ } else {
+ if (comparator.compare(new MergeSortKey(currentTsBlock, vIdx),
mergeSortHeap.peek())
+ < 0) {
+ MergeSortKey peek = mergeSortHeap.poll();
+ updateTsBlockValue(currentTsBlock, vIdx, peek.rowIndex);
+ } else if (childrenDataInOrder) {
+ skipCurrentBatch = true;
+ break;
+ }
}
}
+ // if current childIdx TsBlock has no value to put into heap
+ // the remaining data will also have no value to put int heap
+ if (skipCurrentBatch) {
+ closeOperator(i);
+ }
+ canCallNext[i] = false;
+
+ if (System.nanoTime() - startTime > maxRuntime) {
+ break;
+ }
}
- // if current childIdx TsBlock has no value to put into heap
- // the remaining data will also have no value to put int heap
- if (skipCurrentBatch) {
- closeCurDeviceOperator();
- if (deviceIndex == deviceOperators.size()) {
- return getResultFromMaxHeap(mergeSortHeap);
+ if (batchFinished) {
+ deviceIndex = deviceIndex + deviceBatchStep;
+ if (deviceIndex >= deviceOperators.size()) {
+ return getResultFromCachedTopKResult();
}
- return null;
}
return null;
@@ -235,17 +279,15 @@ public class TopKOperator implements ProcessOperator {
new TsBlock(positionCount, new TimeColumn(positionCount, new
long[positionCount]), columns);
}
- private TsBlock getResultFromMaxHeap(MergeSortHeap mergeSortHeap) {
- int cnt = mergeSortHeap.getHeapSize();
- topKResult = new MergeSortKey[cnt];
- while (!mergeSortHeap.isEmpty()) {
- topKResult[--cnt] = mergeSortHeap.poll();
+ private TsBlock getResultFromCachedTopKResult() {
+ if (mergeSortHeap.getHeapSize() > 0) {
+ int cnt = mergeSortHeap.getHeapSize();
+ topKResult = new MergeSortKey[cnt];
+ while (!mergeSortHeap.isEmpty()) {
+ topKResult[--cnt] = mergeSortHeap.poll();
+ }
}
- return getResultFromCachedTopKResult();
- }
-
- private TsBlock getResultFromCachedTopKResult() {
tsBlockBuilder.reset();
ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
for (int i = resultReturnSize; i < topKResult.length; i++) {
@@ -309,15 +351,12 @@ public class TopKOperator implements ProcessOperator {
mergeSortHeap.push(new MergeSortKey(tmpResultTsBlock, peekIndex));
}
- private Operator getCurDeviceOperator() {
- return deviceOperators.get(deviceIndex);
+ private Operator getOperator(int i) {
+ return deviceOperators.get(i);
}
- private void closeCurDeviceOperator() throws Exception {
- // close finished child
- getCurDeviceOperator().close();
- deviceOperators.set(deviceIndex, null);
- // increment index, move to next child
- deviceIndex++;
+ private void closeOperator(int i) throws Exception {
+ getOperator(i).close();
+ deviceOperators.set(i, null);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 6e086fac7ce..e0be33d076f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -856,7 +856,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
TopKOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
context.setCachedDataTypes(dataTypes);
- List<Operator> children = dealWithConsumeChildrenOneByOneNode(node,
context);
+ List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
index 6ac9ade2501..d07ea63d43c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -45,13 +46,18 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -59,11 +65,13 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey.DEVICE;
import static org.junit.Assert.assertEquals;
public class PipelineBuilderTest {
@@ -376,6 +384,56 @@ public class PipelineBuilderTest {
assertEquals(4, context.getExchangeSumNum());
}
+ /**
+ * This test will test dop = 3. Expected result is three pipelines:
+ *
+ * <p>The first is: TopKNode1 - [SingleDeviceViewNode0, ExchangeOperator,
ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SingleDeviceViewNode1.
+ *
+ * <p>The third is: ExchangeOperator - TopKNode1-1[SingleDeviceViewNode2,
SingleDeviceViewNode3].
+ */
+ @Test
+ public void testTopKConsumeAllChildrenPipelineBuilder3() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TopKNode topKNode = initTopKNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(3);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(topKNode,
context);
+ // The number of pipeline is 2, since parent pipeline hasn't joined
+ assertEquals(2, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(3, childrenOperator.size());
+ assertEquals(SingleDeviceViewOperator.class,
childrenOperator.get(0).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+ // Validate the changes of node structure
+ assertEquals(3, topKNode.getChildren().size());
+ assertEquals("Time",
topKNode.getChildren().get(0).getOutputColumnNames().get(0));
+ assertEquals("Time",
topKNode.getChildren().get(1).getOutputColumnNames().get(0));
+ assertEquals("Time",
topKNode.getChildren().get(2).getOutputColumnNames().get(0));
+ assertEquals(TopKNode.class, topKNode.getChildren().get(2).getClass());
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("SingleDeviceViewNode1",
exchangeOperator1.getSourceId().getId());
+
+ // Validate the third pipeline
+ TopKNode subTimeJoinNode = (TopKNode) topKNode.getChildren().get(2);
+ assertEquals(2, subTimeJoinNode.getChildren().size());
+ assertEquals("Time",
subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+ assertEquals("Time",
subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals(exchangeOperator2.getSourceId(),
subTimeJoinNode.getPlanNodeId());
+
+ // Validate the number exchange operator
+ assertEquals(2, context.getExchangeSumNum());
+ }
+
/**
* The operator structure is [DeviceView -
[SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
*
@@ -855,4 +913,37 @@ public class PipelineBuilderTest {
}
return deviceViewNode;
}
+
+ private TopKNode initTopKNode(TypeProvider typeProvider, int childNum)
+ throws IllegalPathException {
+ TopKNode topKNode =
+ new TopKNode(
+ new PlanNodeId("TopKNode"),
+ 10,
+ new OrderByParameter(
+ Arrays.asList(
+ new SortItem(OrderByKey.TIME, Ordering.ASC),
+ new SortItem(DEVICE, Ordering.ASC))),
+ Arrays.asList("Time", "Device", "s1"));
+ for (int i = 0; i < childNum; i++) {
+ SingleDeviceViewNode singleDeviceViewNode =
+ new SingleDeviceViewNode(
+ new PlanNodeId(String.format("SingleDeviceViewNode%d", i)),
+ Arrays.asList("Time", "Device", "s1"),
+ "root.sg.d" + i,
+ Arrays.asList(0, 1, 2));
+ singleDeviceViewNode.setCacheOutputColumnNames(true);
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(
+ new PlanNodeId(String.format("SeriesScanNode%d", i)),
+ new MeasurementPath(String.format("root.sg.d%d.s1", i),
TSDataType.INT32));
+ typeProvider.setType(seriesScanNode.getSeriesPath().toString(),
TSDataType.INT32);
+ singleDeviceViewNode.addChild(seriesScanNode);
+ typeProvider.setType("Time", TSDataType.INT64);
+ typeProvider.setType("Device", TSDataType.TEXT);
+ typeProvider.setType("s1", TSDataType.INT32);
+ topKNode.addChild(singleDeviceViewNode);
+ }
+ return topKNode;
+ }
}