This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch OffsetOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94e06c7db98c559d849566d97c7bd933ed75479a
Author: JackieTien97 <[email protected]>
AuthorDate: Tue May 3 16:22:22 2022 +0800

    [IOTDB-3082] Implememtation of OffsetOperator
---
 .../execution/operator/process/OffsetOperator.java |  34 +++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  10 +-
 .../mpp/execution/operator/OffsetOperatorTest.java | 197 +++++++++++++++++++++
 3 files changed, 234 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 280d372500..5439f46773 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -18,40 +18,62 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 public class OffsetOperator implements ProcessOperator {
 
+  private final OperatorContext operatorContext;
+  private long remainingOffset;
+  private final Operator child;
+
+  public OffsetOperator(OperatorContext operatorContext, long offset, Operator 
child) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is 
null");
+    checkArgument(offset >= 0, "limit must be at least zero");
+    this.remainingOffset = offset;
+    this.child = requireNonNull(child, "child operator is null");
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    return child.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    TsBlock block = child.next();
+    if (remainingOffset > 0) {
+      int offset = Math.min((int) remainingOffset, block.getPositionCount());
+      remainingOffset -= offset;
+      return block.getRegion(offset, block.getPositionCount() - offset);
+    } else {
+      return block;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return child.hasNext();
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return child.isFinished();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index fb876a35ac..40441350d5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
@@ -371,7 +372,14 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext 
context) {
-      return super.visitOffset(node, context);
+      Operator child = node.getChild().accept(this, context);
+      return new OffsetOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              OffsetOperator.class.getSimpleName()),
+          node.getOffset(),
+          child);
     }
 
     @Override
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
new file mode 100644
index 0000000000..cc97974aaf
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class OffsetOperatorTest {
+
+  private static final String TIME_JOIN_OPERATOR_TEST_SG = 
"root.LimitOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
TIME_JOIN_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          5, new PlanNodeId("5"), LimitOperator.class.getSimpleName());
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+              OrderBy.TIMESTAMP_ASC,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
+              new AscTimeComparator());
+
+      OffsetOperator offsetOperator =
+          new OffsetOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3), 100, 
timeJoinOperator);
+
+      LimitOperator limitOperator =
+          new LimitOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4), 250, 
offsetOperator);
+      int count = 0;
+      while (limitOperator.hasNext()) {
+        TsBlock tsBlock = limitOperator.next();
+        assertEquals(2, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        if (count < 5) {
+          assertEquals(0, tsBlock.getPositionCount());
+        } else if (count < 17) {
+          assertEquals(20, tsBlock.getPositionCount());
+        } else {
+          assertEquals(10, tsBlock.getPositionCount());
+        }
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+          }
+        }
+        count++;
+      }
+      assertEquals(18, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}

Reply via email to