This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 033b38ca0b0f736a7eaef048130bc9f4354efe2f
Author: Alima777 <[email protected]>
AuthorDate: Mon May 9 17:16:24 2022 +0800

    add RawDataAggregateOperator
---
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  42 ++++----
 .../operator/process/RawDataAggregateOperator.java | 115 ++++++++++++++++++---
 .../source/SeriesAggregateScanOperator.java        |  39 ++-----
 3 files changed, 133 insertions(+), 63 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index f083f65fe9..d38cbe88c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -55,32 +55,38 @@ public class Aggregator {
     this.inputLocationList = inputLocationList;
   }
 
-  // Used for SeriesAggregateScanOperator
+  // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public void processTsBlock(TsBlock tsBlock) {
     checkArgument(
-        step.isInputRaw(), "Step in SeriesAggregateScanOperator can only 
process raw input");
+        step.isInputRaw(),
+        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can 
only process raw input");
     // TODO Aligned TimeSeries
-    accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+    if (inputLocationList == null) {
+      accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+    } else {
+      for (InputLocation[] inputLocations : inputLocationList) {
+        checkArgument(
+            inputLocations[0].getTsBlockIndex() == 1,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        Column[] timeValueColumn = new Column[2];
+        timeValueColumn[0] = tsBlock.getTimeColumn();
+        timeValueColumn[1] = 
tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        accumulator.addInput(timeValueColumn, timeRange);
+      }
+    }
   }
 
-  // Used for aggregateOperator
+  // Used for AggregateOperator
   public void processTsBlocks(TsBlock[] tsBlock) {
+    checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot 
process raw input");
     for (InputLocation[] inputLocations : inputLocationList) {
-      if (step.isInputRaw()) {
-        TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()];
-        Column[] timeValueColumn = new Column[2];
-        timeValueColumn[0] = rawTsBlock.getTimeColumn();
-        timeValueColumn[1] = 
rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex());
-        accumulator.addInput(timeValueColumn, timeRange);
-      } else {
-        Column[] columns = new Column[inputLocations.length];
-        for (int i = 0; i < inputLocations.length; i++) {
-          columns[i] =
-              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
-                  inputLocations[i].getValueColumnIndex());
-        }
-        accumulator.addIntermediate(columns);
+      Column[] columns = new Column[inputLocations.length];
+      for (int i = 0; i < inputLocations.length; i++) {
+        columns[i] =
+            tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                inputLocations[i].getValueColumnIndex());
       }
+      accumulator.addIntermediate(columns);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
index 5fb705dc3a..bbe6e6f041 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import 
org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -41,33 +42,38 @@ import static 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateS
  * RawDataAggregateOperator is used to process raw data tsBlock input 
calculating using value
  * filter. It's possible that there is more than one tsBlock input in one time 
interval. And it's
  * also possible that one tsBlock can cover multiple time intervals too.
+ *
+ * <p>Since raw data query with value filter is processed by FilterOperator 
above TimeJoinOperator,
+ * there we can see RawDataAggregateOperator as a one-to-one(one input, ont 
output) operator.
+ *
+ * <p>Return aggregation result in one time interval once.
  */
 public class RawDataAggregateOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Aggregator> aggregators;
-  private final List<Operator> children;
-
-  private final int inputOperatorsCount;
-  private final TsBlock[] inputTsBlocks;
-  private final TsBlockBuilder tsBlockBuilder;
-
+  private final Operator child;
+  private final boolean ascending;
   private ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
+  private TsBlock preCachedData;
+
+  // Using for building result tsBlock
+  private final TsBlockBuilder tsBlockBuilder;
+
   public RawDataAggregateOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
-      List<Operator> children,
+      Operator child,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter) {
     this.operatorContext = operatorContext;
     this.aggregators = aggregators;
-    this.children = children;
+    this.child = child;
+    this.ascending = ascending;
 
-    this.inputOperatorsCount = children.size();
-    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
@@ -83,26 +89,105 @@ public class RawDataAggregateOperator implements 
ProcessOperator {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    return child.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    // 1. Clear previous aggregation result
+    for (Aggregator aggregator : aggregators) {
+      aggregator.reset();
+      aggregator.setTimeRange(curTimeRange);
+    }
+
+    // 2. Calculate aggregation result based on current time window
+    while (!calcFromCacheData(curTimeRange)) {
+      preCachedData = child.next();
+    }
+
+    // 3. Update result using aggregators
+    return AggregateOperator.updateResultTsBlockFromAggregators(
+        tsBlockBuilder, aggregators, curTimeRange);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    if (!timeRangeIterator.hasNextTimeRange()) {
+      return false;
+    }
+    curTimeRange = timeRangeIterator.nextTimeRange();
+    return true;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return !this.hasNext();
+  }
+
+  /** @return if already get the result */
+  private boolean calcFromCacheData(TimeRange curTimeRange) {
+    // check if the batchData does not contain points in current interval
+    if (preCachedData != null && satisfied(preCachedData, curTimeRange, 
ascending)) {
+      // skip points that cannot be calculated
+      preCachedData = skipOutOfTimeRangePoints(preCachedData, curTimeRange, 
ascending);
+
+      for (Aggregator aggregator : aggregators) {
+        // current agg method has been calculated
+        if (aggregator.hasFinalResult()) {
+          continue;
+        }
+
+        aggregator.processTsBlock(preCachedData);
+      }
+    }
+    // The result is calculated from the cache
+    return (preCachedData != null
+            && (ascending
+                ? preCachedData.getEndTime() >= curTimeRange.getMax()
+                : preCachedData.getStartTime() < curTimeRange.getMin()))
+        || isEndCalc(aggregators);
+  }
+
+  // skip points that cannot be calculated
+  public static TsBlock skipOutOfTimeRangePoints(
+      TsBlock tsBlock, TimeRange curTimeRange, boolean ascending) {
+    TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
+    if (ascending) {
+      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < 
curTimeRange.getMin()) {
+        tsBlockIterator.next();
+      }
+    } else {
+      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= 
curTimeRange.getMax()) {
+        tsBlockIterator.next();
+      }
+    }
+    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
+  }
+
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean 
ascending) {
+    TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
+    if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
+      return false;
+    }
+
+    return ascending
+        ? (tsBlockIterator.getEndTime() >= timeRange.getMin()
+            && tsBlockIterator.currentTime() < timeRange.getMax())
+        : (tsBlockIterator.getStartTime() < timeRange.getMax()
+            && tsBlockIterator.currentTime() >= timeRange.getMin());
+  }
+
+  public static boolean isEndCalc(List<Aggregator> aggregators) {
+    for (Aggregator aggregator : aggregators) {
+      if (!aggregator.hasFinalResult()) {
+        return false;
+      }
+    }
+    return true;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index f86b5e5deb..23c8bc28be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -44,6 +44,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.isEndCalc;
+import static 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.skipOutOfTimeRangePoints;
+
 /**
  * This operator is responsible to do the aggregation calculation for one 
series based on global
  * time range and time split parameter.
@@ -250,18 +253,18 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
             && (ascending
                 ? preCachedData.getEndTime() >= curTimeRange.getMax()
                 : preCachedData.getStartTime() < curTimeRange.getMin()))
-        || isEndCalc();
+        || isEndCalc(aggregators);
   }
 
   @SuppressWarnings("squid:S3776")
   private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
+    if (tsBlock == null || !satisfied(tsBlock, curTimeRange, ascending)) {
       return;
     }
 
     // skip points that cannot be calculated
-    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange, ascending);
 
     for (Aggregator aggregator : aggregators) {
       // current agg method has been calculated
@@ -278,22 +281,7 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
     }
   }
 
-  // skip points that cannot be calculated
-  private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange 
curTimeRange) {
-    TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
-    if (ascending) {
-      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < 
curTimeRange.getMin()) {
-        tsBlockIterator.next();
-      }
-    } else {
-      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= 
curTimeRange.getMax()) {
-        tsBlockIterator.next();
-      }
-    }
-    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
-  }
-
-  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean 
ascending) {
     TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
@@ -313,15 +301,6 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
     return true;
   }
 
-  private boolean isEndCalc() {
-    for (Aggregator aggregator : aggregators) {
-      if (!aggregator.hasFinalResult()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws 
IOException {
     while (seriesScanUtil.hasNextPage()) {
@@ -342,7 +321,7 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
             && curTimeRange.contains(pageStatistics.getStartTime(), 
pageStatistics.getEndTime())) {
           calcFromStatistics(pageStatistics);
           seriesScanUtil.skipCurrentPage();
-          if (isEndCalc()) {
+          if (isEndCalc(aggregators)) {
             return true;
           }
           continue;
@@ -369,7 +348,7 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
       calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
-      if (isEndCalc()
+      if (isEndCalc(aggregators)
           || (tsBlockIterator.hasNext()
               && (ascending
                   ? tsBlockIterator.currentTime() >= curTimeRange.getMax()

Reply via email to