This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9f03e74dae5a75e75cf5e4ad988d65ee1126cb8e
Author: Minghui Liu <[email protected]>
AuthorDate: Mon Nov 14 16:21:20 2022 +0800

    implement WindowConcatOperator
---
 .../main/java/org/apache/iotdb/SessionExample.java |  4 +-
 .../process/{ => window}/WindowConcatOperator.java | 44 +++++++++---
 .../operator/process/window/WindowSliceQueue.java  | 80 ++++++++++++++++++++++
 .../process/{ => window}/WindowSplitOperator.java  |  3 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  4 +-
 5 files changed, 121 insertions(+), 14 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 9baf3a8ad2..a488d292a6 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -72,8 +72,8 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", 
"root.sg1.d2.s1");
-    List<Integer> indexes = Arrays.asList(0, 1, 2, 3);
+    List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", 
"root.sg1.d1.s2");
+    List<Integer> indexes = Arrays.asList(1, 2, 6, 7);
     List<SessionDataSet> windowBatch =
         session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes);
     for (SessionDataSet window : windowBatch) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
similarity index 64%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
index 269b579feb..4dc9f14f2d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
@@ -17,15 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.window;
 
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import java.util.List;
 
@@ -34,13 +34,11 @@ public class WindowConcatOperator implements 
ProcessOperator {
   protected final OperatorContext operatorContext;
 
   protected final Operator child;
-  protected TsBlock inputTsBlock;
-  protected boolean canCallNext;
 
   private final ITimeRangeIterator sampleTimeRangeIterator;
   private TimeRange curTimeRange;
 
-  private final TsBlockBuilder resultTsBlockBuilder;
+  private final WindowSliceQueue windowSliceQueue;
 
   public WindowConcatOperator(
       OperatorContext operatorContext,
@@ -50,7 +48,7 @@ public class WindowConcatOperator implements ProcessOperator {
     this.operatorContext = operatorContext;
     this.child = child;
     this.sampleTimeRangeIterator = sampleTimeRangeIterator;
-    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    this.windowSliceQueue = new WindowSliceQueue(outputDataTypes);
   }
 
   @Override
@@ -60,17 +58,45 @@ public class WindowConcatOperator implements 
ProcessOperator {
 
   @Override
   public TsBlock next() {
-    return child.next();
+    if (!child.hasNext()) {
+      curTimeRange = null;
+      return windowSliceQueue.outputWindow();
+    }
+
+    TsBlock inputTsBlock = child.next();
+    if (inputTsBlock == null) {
+      return null;
+    }
+
+    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+      windowSliceQueue.updateTimeRange(curTimeRange);
+    }
+
+    if (inputTsBlock.getStartTime() > curTimeRange.getMax()) {
+      TsBlock outputWindow = windowSliceQueue.outputWindow();
+      if (sampleTimeRangeIterator.hasNextTimeRange()) {
+        curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+        windowSliceQueue.updateTimeRange(curTimeRange);
+      } else {
+        curTimeRange = null;
+      }
+      windowSliceQueue.processTsBlock(inputTsBlock);
+      return outputWindow;
+    } else {
+      windowSliceQueue.processTsBlock(inputTsBlock);
+      return null;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return child.hasNext();
+    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
   }
 
   @Override
   public boolean isFinished() {
-    return child.isFinished();
+    return !this.hasNext();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
new file mode 100644
index 0000000000..59531d2ced
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.window;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class WindowSliceQueue {
+
+  // cached window slice
+  private final Deque<TsBlock> deque = new LinkedList<>();
+
+  private TimeRange curTimeRange;
+
+  private final TsBlockBuilder windowBuilder;
+
+  public WindowSliceQueue(List<TSDataType> dataTypeList) {
+    this.windowBuilder = new TsBlockBuilder(dataTypeList);
+  }
+
+  public void processTsBlock(TsBlock tsBlock) {
+    deque.addLast(tsBlock);
+  }
+
+  public void updateTimeRange(TimeRange curTimeRange) {
+    this.curTimeRange = curTimeRange;
+    evictingExpiredSlice();
+  }
+
+  public void evictingExpiredSlice() {
+    while (!deque.isEmpty() && 
!curTimeRange.contains(deque.getFirst().getStartTime())) {
+      deque.removeFirst();
+    }
+  }
+
+  public TsBlock outputWindow() {
+    windowBuilder.reset();
+
+    TimeColumnBuilder timeColumnBuilder = windowBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = windowBuilder.getValueColumnBuilders();
+    int valueColumnCount = columnBuilders.length;
+
+    for (TsBlock windowSlice : deque) {
+      int positionCount = windowSlice.getPositionCount();
+      for (int index = 0; index < positionCount; index++) {
+        timeColumnBuilder.write(windowSlice.getTimeColumn(), index);
+        for (int columnIndex = 0; columnIndex < valueColumnCount; 
columnIndex++) {
+          
columnBuilders[columnIndex].write(windowSlice.getColumn(columnIndex), index);
+        }
+      }
+      windowBuilder.declarePositions(positionCount);
+    }
+    return windowBuilder.build();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
index 6b9544b1a1..039822429d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.window;
 
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index d839451544..eb50d6ab37 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,8 +54,6 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import 
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -91,6 +89,8 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.window.WindowConcatOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.window.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;

Reply via email to