This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e6c69abadc25230ada942ccec940dee9497451a2 Author: Minghui Liu <[email protected]> AuthorDate: Tue Nov 1 16:21:48 2022 +0800 implement operator --- .../timerangeiterator/SampleWindowIterator.java | 87 +++++++++++ .../TimeRangeIteratorFactory.java | 11 ++ .../operator/process/WindowSplitOperator.java | 160 +++++++++++++++++++++ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 29 ++++ .../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 + .../statement/crud/FetchWindowSetStatement.java | 7 +- 6 files changed, 298 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java new file mode 100644 index 0000000000..396d53d96b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.aggregation.timerangeiterator; + +import org.apache.iotdb.tsfile.read.common.TimeRange; + +import java.util.List; + +public class SampleWindowIterator implements ITimeRangeIterator { + + private final ITimeRangeIterator allTimeRangeIterator; + private final List<Integer> samplingIndexes; + + private int sampleIndex = 0; + private int timeRangeIndex = 0; + + private TimeRange curTimeRange; + + public SampleWindowIterator( + long startTime, + long endTime, + long interval, + long slidingStep, + List<Integer> samplingIndexes) { + this.samplingIndexes = samplingIndexes; + this.allTimeRangeIterator = + TimeRangeIteratorFactory.getTimeRangeIterator( + startTime, endTime, interval, slidingStep, true, false, false, true, false); + } + + @Override + public TimeRange getFirstTimeRange() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasNextTimeRange() { + return sampleIndex < samplingIndexes.size(); + } + + @Override + public TimeRange nextTimeRange() { + while (allTimeRangeIterator.hasNextTimeRange()) { + TimeRange timeRange = allTimeRangeIterator.nextTimeRange(); + if (timeRangeIndex == samplingIndexes.get(sampleIndex)) { + curTimeRange = timeRange; + timeRangeIndex++; + sampleIndex++; + break; + } + timeRangeIndex++; + } + return curTimeRange; + } + + @Override + public boolean isAscending() { + throw new UnsupportedOperationException(); + } + + @Override + public long currentOutputTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getTotalIntervalNum() { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java index a86a513936..cd0ac8a08e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.mpp.aggregation.timerangeiterator; +import java.util.List; + import static org.apache.iotdb.db.qp.utils.DateTimeUtils.MS_TO_MONTH; public class TimeRangeIteratorFactory { @@ -72,4 +74,13 @@ public class TimeRangeIteratorFactory { leftCRightO); } } + + public static ITimeRangeIterator getSampleTimeRangeIterator( + long startTime, + long endTime, + long interval, + long slidingStep, + List<Integer> samplingIndexes) { + return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java new file mode 100644 index 0000000000..d8d4ebc1f7 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator.process; + +import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.List; + +public class WindowSplitOperator implements ProcessOperator { + + protected final OperatorContext operatorContext; + + protected final Operator child; + protected TsBlock inputTsBlock; + protected boolean canCallNext; + + private final ITimeRangeIterator sampleTimeRangeIterator; + private TimeRange curTimeRange; + + private final TsBlockBuilder resultTsBlockBuilder; + + public WindowSplitOperator( + OperatorContext operatorContext, + Operator child, + ITimeRangeIterator sampleTimeRangeIterator, + List<TSDataType> outputDataTypes) { + this.operatorContext = operatorContext; + this.child = child; + this.sampleTimeRangeIterator = sampleTimeRangeIterator; + this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + public TsBlock next() { + // reset operator state + canCallNext = true; + + if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) { + // move to next time window + curTimeRange = sampleTimeRangeIterator.nextTimeRange(); + } + + if (!fetchData()) { + return null; + } else { + curTimeRange = null; + TsBlock resultTsBlock = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return resultTsBlock; + } + } + + private boolean fetchData() { + while (!consumeInput()) { + // NOTE: child.next() can only be invoked once + if (child.hasNext() && canCallNext) { + inputTsBlock = child.next(); + canCallNext = false; + } else { + return false; + } + } + return true; + } + + private boolean consumeInput() { + if (inputTsBlock == null) { + return false; + } + + inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true); + if (inputTsBlock == null) { + return false; + } + + for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) { + long time = inputTsBlock.getTimeByIndex(readIndex); + if (curTimeRange.contains(time)) { + writeData(readIndex); + } else { + inputTsBlock = inputTsBlock.subTsBlock(readIndex); + return true; + } + } + return false; + } + + private void writeData(int readIndex) { + TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); + timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex)); + ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) { + columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex); + } + resultTsBlockBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange(); + } + + @Override + public boolean isFinished() { + return !this.hasNext(); + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index a02cbd773a..4c69cd5720 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory; import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.NodeRef; import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext; @@ -53,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill; @@ -147,6 +149,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode; @@ -1575,6 +1578,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return child; } + @Override + public Operator visitWindowSplit(WindowSplitNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getInstanceContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + WindowSplitOperator.class.getSimpleName()); + + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); + ITimeRangeIterator timeRangeIterator = + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + groupByTimeParameter.getStartTime(), + groupByTimeParameter.getEndTime(), + groupByTimeParameter.getInterval(), + groupByTimeParameter.getSlidingStep(), + node.getSamplingIndexes()); + + List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes); + } + @Override public Operator visitSchemaFetchMerge( SchemaFetchMergeNode node, LocalExecutionPlanContext context) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java index 788649ffc8..6a064cd90d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode; @@ -326,4 +327,8 @@ public abstract class PlanVisitor<R, C> { public R visitDeviceViewInto(DeviceViewIntoNode node, C context) { return visitPlan(node, context); } + + public R visitWindowSplit(WindowSplitNode node, C context) { + return visitPlan(node, context); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java index fd9dc9a1e0..fa1b815cda 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.statement.crud; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.mpp.plan.constant.StatementType; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.mpp.plan.statement.Statement; @@ -81,5 +82,9 @@ public class FetchWindowSetStatement extends Statement { return visitor.visitFetchWindowSet(this, context); } - public void semanticCheck() {} + public void semanticCheck() { + if (groupByTimeParameter.hasOverlap()) { + throw new SemanticException(""); + } + } }
