This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit af1cc09402f3a15ea7ce47ce5af079fe2a0b3d3b
Author: Alima777 <[email protected]>
AuthorDate: Fri Apr 29 20:04:46 2022 +0800

    implement part accumulator 2
---
 .../db/mpp/operator/aggregation/Accumulator.java   |  3 +-
 .../aggregation/FirstValueAccumulator.java         | 90 +++++++++++++++++++++-
 .../operator/aggregation/LastValueAccumulator.java | 90 +++++++++++++++++++++-
 .../operator/aggregation/MaxTimeAccumulator.java   | 80 ++++++++++++++++++-
 .../operator/aggregation/MinTimeAccumulator.java   | 80 ++++++++++++++++++-
 5 files changed, 338 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index 5833a75959..eaa7dd99e5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -59,7 +59,8 @@ public interface Accumulator {
   void reset();
 
   /**
-   * For first_value or last_value in decreasing order, we can get final 
result by the first record.
+   * This method can only be used in seriesAggregateScanOperator. For 
first_value or last_value in
+   * decreasing order, we can get final result by the first record.
    */
   boolean hasFinalResult();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index 49ad6a5133..6af6164b49 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -17,5 +17,93 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class 
FirstValueAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class FirstValueAccumulator implements Accumulator {
+
+  private boolean hasCandidateResult;
+  private TsPrimitiveType firstValue;
+  private long minTime = Long.MAX_VALUE;
+
+  public FirstValueAccumulator(TSDataType seriesDataType) {
+    firstValue = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateFirstValue(column[1].getObject(0), curTime);
+    }
+  }
+
+  // partialResult should be like: | FirstValue | MinTime |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 2) {
+      throw new IllegalArgumentException("partialResult of FirstValue should 
be 2");
+    }
+    updateFirstValue(partialResult[0].getObject(0), 
partialResult[1].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateFirstValue(statistics.getFirstValue(), statistics.getStartTime());
+  }
+
+  // finalResult should be single column, like: | finalFirstValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    firstValue.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be double in FirstValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeObject(firstValue.getValue());
+    columnBuilders[1].writeLong(minTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeObject(firstValue.getValue());
+  }
+
+  @Override
+  public void reset() {
+    this.minTime = Long.MAX_VALUE;
+    this.firstValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {firstValue.getDataType(), TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return firstValue.getDataType();
+  }
+
+  private void updateFirstValue(Object value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setObject(value);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index 97183aae52..1ecd65ae61 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -17,5 +17,93 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class 
LastValueAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class LastValueAccumulator implements Accumulator {
+
+  private TsPrimitiveType lastValue;
+  private long maxTime = Long.MIN_VALUE;
+
+  public LastValueAccumulator(TSDataType seriesDataType) {
+    lastValue = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateLastValue(column[1].getObject(0), curTime);
+      }
+    }
+  }
+
+  // partialResult should be like: | LastValue | MaxTime |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 2) {
+      throw new IllegalArgumentException("partialResult of LastValue should be 
2");
+    }
+    updateLastValue(partialResult[0].getObject(0), 
partialResult[1].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateLastValue(statistics.getLastValue(), statistics.getEndTime());
+  }
+
+  // finalResult should be single column, like: | finalLastValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    lastValue.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be double in LastValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeObject(lastValue.getValue());
+    columnBuilders[1].writeLong(maxTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeObject(lastValue.getValue());
+  }
+
+  @Override
+  public void reset() {
+    this.maxTime = Long.MIN_VALUE;
+    this.lastValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {lastValue.getDataType(), TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return lastValue.getDataType();
+  }
+
+  private void updateLastValue(Object value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setObject(value);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index c5d03f8442..3addbf26d9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -17,5 +17,83 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class 
MaxTimeAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class MaxTimeAccumulator implements Accumulator {
+
+  private long maxTime = Long.MIN_VALUE;
+
+  public MaxTimeAccumulator() {}
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMaxTime(curTime);
+      }
+    }
+  }
+
+  // partialResult should be like: | partialMaxTimeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MaxTime should be 
1");
+    }
+    updateMaxTime(partialResult[0].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateMaxTime(statistics.getEndTime());
+  }
+
+  // finalResult should be single column, like: | finalMaxTime |
+  @Override
+  public void setFinal(Column finalResult) {
+    maxTime = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in maxTimeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(maxTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(maxTime);
+  }
+
+  @Override
+  public void reset() {
+    this.maxTime = Long.MIN_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+
+  private void updateMaxTime(long curTime) {
+    maxTime = Math.max(maxTime, curTime);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index 95bf611acf..893d8436eb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -17,5 +17,83 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class 
MinTimeAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class MinTimeAccumulator implements Accumulator {
+
+  private boolean hasCandidateResult;
+  private long minTime = Long.MAX_VALUE;
+
+  public MinTimeAccumulator() {}
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateMinTime(curTime);
+    }
+  }
+
+  // partialResult should be like: | partialMinTimeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MinTime should be 
1");
+    }
+    updateMinTime(partialResult[0].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateMinTime(statistics.getStartTime());
+  }
+
+  // finalResult should be single column, like: | finalMinTime |
+  @Override
+  public void setFinal(Column finalResult) {
+    minTime = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in minTimeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(minTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(minTime);
+  }
+
+  @Override
+  public void reset() {
+    this.minTime = Long.MAX_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+
+  private void updateMinTime(long curTime) {
+    hasCandidateResult = true;
+    minTime = Math.min(minTime, curTime);
+  }
 }

Reply via email to