This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cf1cffa7f9bad2b738a27d902a07e4a4b32558d3 Author: Minghui Liu <[email protected]> AuthorDate: Mon Nov 14 15:04:54 2022 +0800 modify WindowSplitOperator --- .../main/java/org/apache/iotdb/SessionExample.java | 4 +-- .../operator/process/WindowConcatOperator.java | 14 ++++----- .../operator/process/WindowSplitOperator.java | 33 ++++++++++++++++------ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 17 +++++++++-- 4 files changed, 49 insertions(+), 19 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 8760400c91..9baf3a8ad2 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -73,9 +73,9 @@ public class SessionExample { session.setFetchSize(10000); List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1"); - List<Integer> indexes = Arrays.asList(1, 3, 5, 7); + List<Integer> indexes = Arrays.asList(0, 1, 2, 3); List<SessionDataSet> windowBatch = - session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes); + session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes); for (SessionDataSet window : windowBatch) { System.out.println(window.getColumnNames()); while (window.hasNext()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java index 1f40016f9a..269b579feb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java @@ -55,36 +55,36 @@ public class WindowConcatOperator implements ProcessOperator { @Override public OperatorContext getOperatorContext() { - return null; + return operatorContext; } @Override public TsBlock next() { - return null; + return child.next(); } @Override public boolean hasNext() { - return false; + return child.hasNext(); } @Override public boolean isFinished() { - return false; + return child.isFinished(); } @Override public long calculateMaxPeekMemory() { - return 0; + return child.calculateMaxPeekMemory(); } @Override public long calculateMaxReturnSize() { - return 0; + return child.calculateMaxReturnSize(); } @Override public long calculateRetainedSizeAfterCallingNext() { - return 0; + return child.calculateRetainedSizeAfterCallingNext(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java index f2ee804f75..6b9544b1a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java @@ -42,18 +42,23 @@ public class WindowSplitOperator implements ProcessOperator { protected TsBlock inputTsBlock; protected boolean canCallNext; - private final ITimeRangeIterator sampleTimeRangeSliceIterator; + private final ITimeRangeIterator sampleTimeRangeIterator; private TimeRange curTimeRange; + private final ITimeRangeIterator sampleTimeRangeSliceIterator; + private TimeRange curTimeRangeSlice; + private final TsBlockBuilder resultTsBlockBuilder; public WindowSplitOperator( OperatorContext operatorContext, Operator child, + ITimeRangeIterator sampleTimeRangeIterator, ITimeRangeIterator sampleTimeRangeSliceIterator, List<TSDataType> outputDataTypes) { this.operatorContext = operatorContext; this.child = child; + this.sampleTimeRangeIterator = sampleTimeRangeIterator; this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator; this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); } @@ -73,15 +78,27 @@ public class WindowSplitOperator implements ProcessOperator { // reset operator state canCallNext = true; - if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) { - // move to next time window - curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange(); + if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) { + curTimeRange = sampleTimeRangeIterator.nextTimeRange(); + } + + while (curTimeRangeSlice == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) { + curTimeRangeSlice = sampleTimeRangeSliceIterator.nextTimeRange(); + if (curTimeRangeSlice.getMin() > curTimeRange.getMax()) { + if (sampleTimeRangeIterator.hasNextTimeRange()) { + curTimeRange = sampleTimeRangeIterator.nextTimeRange(); + } + if (curTimeRangeSlice.getMin() > curTimeRange.getMax() + || curTimeRangeSlice.getMax() < curTimeRange.getMin()) { + curTimeRangeSlice = null; + } + } } if (!fetchData()) { return null; } else { - curTimeRange = null; + curTimeRangeSlice = null; TsBlock resultTsBlock = resultTsBlockBuilder.build(); resultTsBlockBuilder.reset(); return resultTsBlock; @@ -106,14 +123,14 @@ public class WindowSplitOperator implements ProcessOperator { return false; } - inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true); + inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRangeSlice, true); if (inputTsBlock == null) { return false; } for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) { long time = inputTsBlock.getTimeByIndex(readIndex); - if (curTimeRange.contains(time)) { + if (curTimeRangeSlice.contains(time)) { writeData(readIndex); } else { inputTsBlock = inputTsBlock.subTsBlock(readIndex); @@ -135,7 +152,7 @@ public class WindowSplitOperator implements ProcessOperator { @Override public boolean hasNext() { - return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange(); + return curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 888c3f94a7..d839451544 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1592,7 +1592,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP WindowSplitOperator.class.getSimpleName()); GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); - ITimeRangeIterator timeRangeIterator = + ITimeRangeIterator sampleTimeRangeIterator = + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + groupByTimeParameter.getStartTime(), + groupByTimeParameter.getEndTime(), + groupByTimeParameter.getInterval(), + groupByTimeParameter.getSlidingStep(), + node.getSamplingIndexes(), + false); + ITimeRangeIterator sampleTimeRangeSliceIterator = TimeRangeIteratorFactory.getSampleTimeRangeIterator( groupByTimeParameter.getStartTime(), groupByTimeParameter.getEndTime(), @@ -1604,7 +1612,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes); + return new WindowSplitOperator( + operatorContext, + child, + sampleTimeRangeIterator, + sampleTimeRangeSliceIterator, + outputDataTypes); } @Override
