This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch OffsetOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 94e06c7db98c559d849566d97c7bd933ed75479a Author: JackieTien97 <[email protected]> AuthorDate: Tue May 3 16:22:22 2022 +0800 [IOTDB-3082] Implememtation of OffsetOperator --- .../execution/operator/process/OffsetOperator.java | 34 +++- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 10 +- .../mpp/execution/operator/OffsetOperatorTest.java | 197 +++++++++++++++++++++ 3 files changed, 234 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java index 280d372500..5439f46773 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java @@ -18,40 +18,62 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process; +import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + public class OffsetOperator implements ProcessOperator { + private final OperatorContext operatorContext; + private long remainingOffset; + private final Operator child; + + public OffsetOperator(OperatorContext operatorContext, long offset, Operator child) { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + checkArgument(offset >= 0, "limit must be at least zero"); + this.remainingOffset = offset; + this.child = requireNonNull(child, "child operator is null"); + } + @Override public OperatorContext getOperatorContext() { - return null; + return operatorContext; } @Override public ListenableFuture<Void> isBlocked() { - return ProcessOperator.super.isBlocked(); + return child.isBlocked(); } @Override public TsBlock next() { - return null; + TsBlock block = child.next(); + if (remainingOffset > 0) { + int offset = Math.min((int) remainingOffset, block.getPositionCount()); + remainingOffset -= offset; + return block.getRegion(offset, block.getPositionCount() - offset); + } else { + return block; + } } @Override public boolean hasNext() { - return false; + return child.hasNext(); } @Override public void close() throws Exception { - ProcessOperator.super.close(); + child.close(); } @Override public boolean isFinished() { - return false; + return child.isFinished(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index fb876a35ac..40441350d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger; @@ -371,7 +372,14 @@ public class LocalExecutionPlanner { @Override public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) { - return super.visitOffset(node, context); + Operator child = node.getChild().accept(this, context); + return new OffsetOperator( + context.instanceContext.addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + OffsetOperator.class.getSimpleName()), + node.getOffset(), + child); } @Override diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java new file mode 100644 index 0000000000..cc97974aaf --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.common.PlanFragmentId; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; +import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class OffsetOperatorTest { + + private static final String TIME_JOIN_OPERATOR_TEST_SG = "root.LimitOperatorTest"; + private final List<String> deviceIds = new ArrayList<>(); + private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + + private final List<TsFileResource> seqResources = new ArrayList<>(); + private final List<TsFileResource> unSeqResources = new ArrayList<>(); + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unSeqResources, TIME_JOIN_OPERATOR_TEST_SG); + } + + @After + public void tearDown() throws IOException { + SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); + } + + @Test + public void batchTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + MeasurementPath measurementPath1 = + new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + Set<String> allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + fragmentInstanceContext.addOperatorContext( + 2, planNodeId2, SeriesScanOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 5, new PlanNodeId("5"), LimitOperator.class.getSimpleName()); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + planNodeId1, + measurementPath1, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(0), + null, + null, + true); + seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath2 = + new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + planNodeId2, + measurementPath2, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(1), + null, + null, + true); + seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + TimeJoinOperator timeJoinOperator = + new TimeJoinOperator( + fragmentInstanceContext.getOperatorContexts().get(2), + Arrays.asList(seriesScanOperator1, seriesScanOperator2), + OrderBy.TIMESTAMP_ASC, + Arrays.asList(TSDataType.INT32, TSDataType.INT32), + Arrays.asList( + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())), + new AscTimeComparator()); + + OffsetOperator offsetOperator = + new OffsetOperator( + fragmentInstanceContext.getOperatorContexts().get(3), 100, timeJoinOperator); + + LimitOperator limitOperator = + new LimitOperator( + fragmentInstanceContext.getOperatorContexts().get(4), 250, offsetOperator); + int count = 0; + while (limitOperator.hasNext()) { + TsBlock tsBlock = limitOperator.next(); + assertEquals(2, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof IntColumn); + assertTrue(tsBlock.getColumn(1) instanceof IntColumn); + if (count < 5) { + assertEquals(0, tsBlock.getPositionCount()); + } else if (count < 17) { + assertEquals(20, tsBlock.getPositionCount()); + } else { + assertEquals(10, tsBlock.getPositionCount()); + } + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = i + 20L * count; + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + if (expectedTime < 200) { + assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i)); + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i)); + } else { + assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i)); + } + } + count++; + } + assertEquals(18, count); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } +}
