This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c526024  [IOTDB-514] Extend aggregation (#833)
c526024 is described below

commit c526024ecd60c347544435c8c2178042c33b591c
Author: 张宇欣 <[email protected]>
AuthorDate: Tue Feb 25 09:01:30 2020 +0800

    [IOTDB-514] Extend aggregation (#833)
    
    * add merge function to AggregateResult and add timestamp attribute for 
FirstValueAggrResult and LastValueAggrResult.
---
 .../SystemDesign/1-TsFile/3-Write.md               |   8 +-
 .../db/query/aggregation/AggregateResult.java      |   6 +
 .../db/query/aggregation/impl/AvgAggrResult.java   |   6 +
 .../db/query/aggregation/impl/CountAggrResult.java |   8 +-
 .../aggregation/impl/FirstValueAggrResult.java     |  22 ++-
 .../aggregation/impl/LastValueAggrResult.java      |  23 ++-
 .../query/aggregation/impl/MaxTimeAggrResult.java  |   8 +
 .../query/aggregation/impl/MaxValueAggrResult.java |   5 +
 .../query/aggregation/impl/MinTimeAggrResult.java  |   8 +
 .../query/aggregation/impl/MinValueAggrResult.java |   8 +
 .../db/query/aggregation/impl/SumAggrResult.java   |   6 +
 .../db/query/aggregation/AggregateResultTest.java  | 183 +++++++++++++++++++++
 .../iotdb/db/query/component/SimpleFileWriter.java |  54 ------
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  14 +-
 .../apache/iotdb/tsfile/write/page/PageWriter.java |  12 +-
 15 files changed, 290 insertions(+), 81 deletions(-)

diff --git a/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md 
b/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
index fe2fd7f..7a27f66 100644
--- a/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
+++ b/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
@@ -25,7 +25,7 @@
 
 TsFile 的写入流程如下图所示:
 
-<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; 
margin-right:auto; display:block;" 
src="https://user-images.githubusercontent.com/19167280/73625238-efba2980-467e-11ea-927e-a7021f8153af.png";>
+< img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; 
margin-right:auto; display:block;" 
src="https://user-images.githubusercontent.com/19167280/73625238-efba2980-467e-11ea-927e-a7021f8153af.png";>
 
 其中,每个设备对应一个 ChunkGroupWriter,每个传感器对应一个 ChunkWriter。
 
@@ -41,11 +41,11 @@ TsFile 文件层的写入接口有两种
 
 * TsFileWriter.write(TSRecord record)
 
-       写入一个设备一个时间戳多个测点。
+ 写入一个设备一个时间戳多个测点。
 
 * TsFileWriter.write(RowBatch rowBatch)
 
-       写入一个设备多个时间戳多个测点。
+ 写入一个设备多个时间戳多个测点。
 
 当调用 write 接口时,这个设备的数据会交给对应的 ChunkGroupWriter,其中的每个测点会交给对应的 ChunkWriter 
进行写入。ChunkWriter 完成编码和打包(生成 Page)。
 
@@ -62,4 +62,4 @@ TsFile 文件层的写入接口有两种
 
 * TsFileWriter.close()
 
-根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部,最后关闭文件。
+根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部,最后关闭文件。
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7ada955..e24606b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -94,6 +94,11 @@ public abstract class AggregateResult {
    */
   public abstract boolean isCalculatedAggregationResult();
 
+  /**
+   * Merge another aggregateResult into this
+   */
+  public abstract void merge(AggregateResult another);
+
   public void reset() {
     hasResult = false;
   }
@@ -209,4 +214,5 @@ public abstract class AggregateResult {
   protected boolean hasResult() {
     return hasResult;
   }
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index cec12fd..dbc677e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -112,4 +112,10 @@ public class AvgAggrResult extends AggregateResult {
   public boolean isCalculatedAggregationResult() {
     return false;
   }
+
+  @Override
+  public void merge(AggregateResult another) {
+    AvgAggrResult anotherAvg = (AvgAggrResult) another;
+    avg = (avg * cnt + anotherAvg.avg * anotherAvg.cnt) / (cnt + 
anotherAvg.cnt);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 0e855da..34ffabb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
-public class CountAggrResult extends AggregateResult {
+          public class CountAggrResult extends AggregateResult {
 
   public CountAggrResult() {
     super(TSDataType.INT64);
@@ -86,4 +86,10 @@ public class CountAggrResult extends AggregateResult {
   public boolean isCalculatedAggregationResult() {
     return false;
   }
+
+  @Override
+  public void merge(AggregateResult another) {
+    CountAggrResult anotherCount = (CountAggrResult) another;
+    setLongValue(anotherCount.getResult() + this.getResult());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index c86e078..e8f06fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 
 public class FirstValueAggrResult extends AggregateResult {
 
+  // timestamp of current value
+  private long timestamp = Long.MAX_VALUE;
+
   public FirstValueAggrResult(TSDataType dataType) {
     super(dataType);
     reset();
@@ -40,17 +43,14 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromStatistics(Statistics statistics)
-      throws QueryProcessException {
+  public void updateResultFromStatistics(Statistics statistics) {
     if (hasResult()) {
       return;
     }
 
     Object firstVal = statistics.getFirstValue();
-    if (firstVal == null) {
-      throw new QueryProcessException("ChunkMetaData contains no FIRST value");
-    }
     setValue(firstVal);
+    timestamp = statistics.getStartTime();
   }
 
   @Override
@@ -60,6 +60,7 @@ public class FirstValueAggrResult extends AggregateResult {
     }
     if (dataInThisPage.hasCurrent()) {
       setValue(dataInThisPage.currentValue());
+      timestamp = dataInThisPage.currentTime();
     }
   }
 
@@ -70,6 +71,7 @@ public class FirstValueAggrResult extends AggregateResult {
     }
     if (dataInThisPage.hasCurrent() && dataInThisPage.currentTime() < bound) {
       setValue(dataInThisPage.currentValue());
+      timestamp = dataInThisPage.currentTime();
       dataInThisPage.next();
     }
   }
@@ -85,6 +87,7 @@ public class FirstValueAggrResult extends AggregateResult {
       Object value = dataReader.getValueInTimestamp(timestamps[i]);
       if (value != null) {
         setValue(value);
+        timestamp = timestamps[i];
         break;
       }
     }
@@ -94,4 +97,13 @@ public class FirstValueAggrResult extends AggregateResult {
   public boolean isCalculatedAggregationResult() {
     return hasResult();
   }
+
+  @Override
+  public void merge(AggregateResult another) {
+    FirstValueAggrResult anotherFirst = (FirstValueAggrResult) another;
+    if(this.getValue() == null || this.timestamp > anotherFirst.timestamp){
+      this.setValue( anotherFirst.getValue() );
+      this.timestamp = anotherFirst.timestamp;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index f68bdf3..9154545 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -28,6 +28,9 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 
 public class LastValueAggrResult extends AggregateResult {
 
+  //timestamp of current value
+  private long timestamp = Long.MIN_VALUE;
+
   public LastValueAggrResult(TSDataType dataType) {
     super(dataType);
     reset();
@@ -42,6 +45,7 @@ public class LastValueAggrResult extends AggregateResult {
   public void updateResultFromStatistics(Statistics statistics) {
     Object lastVal = statistics.getLastValue();
     setValue(lastVal);
+    timestamp = statistics.getEndTime();
   }
 
   @Override
@@ -51,7 +55,7 @@ public class LastValueAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(BatchData dataInThisPage, long bound) {
-    long time = -1;
+    long time = Long.MIN_VALUE;
     Object lastVal = null;
     while (dataInThisPage.hasCurrent() && dataInThisPage.currentTime() < 
bound) {
       time = dataInThisPage.currentTime();
@@ -59,8 +63,9 @@ public class LastValueAggrResult extends AggregateResult {
       dataInThisPage.next();
     }
 
-    if (time != -1) {
+    if (time != Long.MIN_VALUE) {
       setValue(lastVal);
+      timestamp = time;
     }
   }
 
@@ -68,7 +73,7 @@ public class LastValueAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(long[] timestamps, int length,
       IReaderByTimestamp dataReader) throws IOException {
 
-    long time = -1;
+    long time = Long.MIN_VALUE;
     Object lastVal = null;
     for (int i = 0; i < length; i++) {
       Object value = dataReader.getValueInTimestamp(timestamps[i]);
@@ -77,8 +82,9 @@ public class LastValueAggrResult extends AggregateResult {
         lastVal = value;
       }
     }
-    if (time != -1) {
+    if (time != Long.MIN_VALUE) {
       setValue(lastVal);
+      timestamp = time;
     }
   }
 
@@ -87,4 +93,13 @@ public class LastValueAggrResult extends AggregateResult {
     return false;
   }
 
+  @Override
+  public void merge(AggregateResult another) {
+    LastValueAggrResult anotherFirst = (LastValueAggrResult) another;
+    if(this.getValue() == null || this.timestamp < anotherFirst.timestamp){
+      this.setValue( anotherFirst.getValue() );
+      this.timestamp = anotherFirst.timestamp;
+    }
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 0b64fa7..530a1af 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -85,6 +85,14 @@ public class MaxTimeAggrResult extends AggregateResult {
     return false;
   }
 
+  @Override
+  public void merge(AggregateResult another) {
+    MaxTimeAggrResult anotherMaxTime = (MaxTimeAggrResult) another;
+    if (anotherMaxTime.getResult() != null) {
+      this.updateMaxTimeResult(anotherMaxTime.getResult());
+    }
+  }
+
   private void updateMaxTimeResult(long value) {
     if (!hasResult() || value >= getLongValue()) {
       setLongValue(value);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 8413ffc..ff360a2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -83,6 +83,11 @@ public class MaxValueAggrResult extends AggregateResult {
     return false;
   }
 
+  @Override
+  public void merge(AggregateResult another) {
+     this.updateResult((Comparable<Object>)another.getResult());
+  }
+
   private void updateResult(Comparable<Object> maxVal) {
     if (maxVal == null) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index d2346f7..5c3ac9d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -82,4 +82,12 @@ public class MinTimeAggrResult extends AggregateResult {
     return hasResult();
   }
 
+  @Override
+  public void merge(AggregateResult another) {
+    MinTimeAggrResult anotherMinTime = (MinTimeAggrResult) another;
+    if (hasResult() && anotherMinTime.hasResult() && getResult() > 
anotherMinTime.getResult()) {
+      setLongValue(anotherMinTime.getResult());
+    }
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 338c16f..525d860 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -78,6 +78,14 @@ public class MinValueAggrResult extends AggregateResult {
     return false;
   }
 
+  @Override
+  public void merge(AggregateResult another) {
+    if (another.getResult() != null) {
+      Object value = another.getResult();
+      this.updateResult((Comparable<Object>) value);
+    }
+  }
+
   private void updateResult(Comparable<Object> minVal) {
     if (minVal == null) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index bc1fbec..f21fe04 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -104,4 +104,10 @@ public class SumAggrResult extends AggregateResult {
   public boolean isCalculatedAggregationResult() {
     return false;
   }
+
+  @Override
+  public void merge(AggregateResult another) {
+    SumAggrResult anotherSum = (SumAggrResult) another;
+    setDoubleValue(getDoubleValue() + anotherSum.getDoubleValue());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
new file mode 100644
index 0000000..698b943
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.factory.AggreResultFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AggregateResultTest {
+
+  @Test
+  public void avgAggrResultTest() throws QueryProcessException {
+    AggregateResult avgAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.AVG, TSDataType.DOUBLE);
+    AggregateResult avgAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.AVG, TSDataType.DOUBLE);
+
+    Statistics statistics = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics.update(1l,1d);
+
+    avgAggrResult1.updateResultFromStatistics(statistics);
+    avgAggrResult2.updateResultFromStatistics(statistics);
+    avgAggrResult1.merge(avgAggrResult2);
+    Assert.assertEquals(1d, (double)avgAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void maxValueAggrResultTest() throws QueryProcessException {
+    AggregateResult maxValueAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_VALUE, 
TSDataType.DOUBLE);
+    AggregateResult maxValueAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_VALUE, 
TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(1l,2d);
+
+    maxValueAggrResult1.updateResultFromStatistics(statistics1);
+    maxValueAggrResult2.updateResultFromStatistics(statistics2);
+    maxValueAggrResult1.merge(maxValueAggrResult2);
+
+    Assert.assertEquals(2d, (double)maxValueAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void maxTimeAggrResultTest() throws QueryProcessException {
+    AggregateResult maxTimeAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.DOUBLE);
+    AggregateResult maxTimeAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(2l,1d);
+
+    maxTimeAggrResult1.updateResultFromStatistics(statistics1);
+    maxTimeAggrResult2.updateResultFromStatistics(statistics2);
+    maxTimeAggrResult1.merge(maxTimeAggrResult2);
+
+    Assert.assertEquals(2l, (long)maxTimeAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void minValueAggrResultTest() throws QueryProcessException {
+    AggregateResult minValueAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_VALUE, 
TSDataType.DOUBLE);
+    AggregateResult minValueAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_VALUE, 
TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(1l,2d);
+
+    minValueAggrResult1.updateResultFromStatistics(statistics1);
+    minValueAggrResult2.updateResultFromStatistics(statistics2);
+    minValueAggrResult1.merge(minValueAggrResult2);
+
+    Assert.assertEquals(1d, (double)minValueAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void minTimeAggrResultTest() throws QueryProcessException {
+    AggregateResult minTimeAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.DOUBLE);
+    AggregateResult minTimeAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(2l,1d);
+
+    minTimeAggrResult1.updateResultFromStatistics(statistics1);
+    minTimeAggrResult2.updateResultFromStatistics(statistics2);
+    minTimeAggrResult1.merge(minTimeAggrResult2);
+
+    Assert.assertEquals(1l, (long)minTimeAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void countAggrResultTest() throws QueryProcessException {
+    AggregateResult countAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.COUNT, TSDataType.INT64);
+    AggregateResult countAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.COUNT, TSDataType.INT64);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.INT64);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.INT64);
+    statistics1.update(1l, 1l);
+    statistics2.update(1l,1l);
+
+    countAggrResult1.updateResultFromStatistics(statistics1);
+    countAggrResult2.updateResultFromStatistics(statistics2);
+    countAggrResult1.merge(countAggrResult2);
+
+    Assert.assertEquals(2, (long)countAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void sumAggrResultTest() throws QueryProcessException {
+    AggregateResult sumAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.SUM, TSDataType.DOUBLE);
+    AggregateResult sumAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.SUM, TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(1l,2d);
+
+    sumAggrResult1.updateResultFromStatistics(statistics1);
+    sumAggrResult2.updateResultFromStatistics(statistics2);
+    sumAggrResult1.merge(sumAggrResult2);
+
+    Assert.assertEquals(3d, (double)sumAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void firstValueAggrResultTest() throws QueryProcessException {
+    AggregateResult firstValueAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.FIRST_VALUE, 
TSDataType.DOUBLE);
+    AggregateResult firstValueAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.FIRST_VALUE, 
TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(2l,2d);
+
+    firstValueAggrResult1.updateResultFromStatistics(statistics1);
+    firstValueAggrResult2.updateResultFromStatistics(statistics2);
+    firstValueAggrResult1.merge(firstValueAggrResult2);
+
+    Assert.assertEquals(1d, (double)firstValueAggrResult1.getResult(), 0.01);
+  }
+
+  @Test
+  public void lastValueAggrResultTest() throws QueryProcessException {
+    AggregateResult lastValueAggrResult1 = 
AggreResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE, 
TSDataType.DOUBLE);
+    AggregateResult lastValueAggrResult2 = 
AggreResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE, 
TSDataType.DOUBLE);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1l, 1d);
+    statistics2.update(2l,2d);
+
+    lastValueAggrResult1.updateResultFromStatistics(statistics1);
+    lastValueAggrResult2.updateResultFromStatistics(statistics2);
+    lastValueAggrResult1.merge(lastValueAggrResult2);
+
+    Assert.assertEquals(2d, (double)lastValueAggrResult1.getResult(), 0.01);
+  }
+
+
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/component/SimpleFileWriter.java
 
b/server/src/test/java/org/apache/iotdb/db/query/component/SimpleFileWriter.java
deleted file mode 100644
index d7e114f..0000000
--- 
a/server/src/test/java/org/apache/iotdb/db/query/component/SimpleFileWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.component;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.junit.Ignore;
-
-@Ignore
-public class SimpleFileWriter {
-
-  public static void writeFile(String path, byte[] bytes) throws IOException {
-    File file = new File(path);
-    if (file.exists()) {
-      file.delete();
-    }
-    file.createNewFile();
-    FileOutputStream fileOutputStream = new FileOutputStream(path);
-    fileOutputStream.write(bytes, 0, bytes.length);
-    fileOutputStream.close();
-  }
-
-  public static void writeFile(int size, String path) throws IOException {
-    File file = new File(path);
-    if (file.exists()) {
-      file.delete();
-    }
-    file.createNewFile();
-    byte[] bytes = new byte[size];
-    for (int i = 0; i < size; i++) {
-      bytes[i] = (byte) (i % 200 + 1);
-    }
-    FileOutputStream fileOutputStream = new FileOutputStream(path);
-    fileOutputStream.write(bytes, 0, size);
-    fileOutputStream.close();
-  }
-}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 2fb20f8..3c6a837 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -46,7 +46,7 @@ public class ChunkWriterImpl implements IChunkWriter {
   private ICompressor compressor;
 
   /**
-   * all pages of this column.
+   * all pages of this chunk.
    */
   private PublicBAOS pageBuffer;
 
@@ -172,13 +172,13 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   /**
-   * check occupied memory size, if it exceeds the PageSize threshold, flush 
them to given
-   * OutputStream.
+   * check occupied memory size, if it exceeds the PageSize threshold, 
construct a page and 
+   * put it to pageBuffer
    */
   private void checkPageSizeAndMayOpenANewPage() {
     if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
       logger.debug("current line count reaches the upper bound, write page 
{}", measurementSchema);
-      writePage();
+      writePageToPageBuffer();
     } else if (pageWriter.getPointNumber()
         >= valueCountInOnePageForNextCheck) { // need to check memory size
       // not checking the memory used for every value
@@ -189,7 +189,7 @@ public class ChunkWriterImpl implements IChunkWriter {
             "enough size, write page {}, pageSizeThreshold:{}, 
currentPateSize:{}, valueCountInOnePage:{}",
             measurementSchema.getMeasurementId(), pageSizeThreshold, 
currentPageSize,
             pageWriter.getPointNumber());
-        writePage();
+        writePageToPageBuffer();
         valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
       } else {
         // reset the valueCountInOnePageForNextCheck for the next page
@@ -199,7 +199,7 @@ public class ChunkWriterImpl implements IChunkWriter {
     }
   }
 
-  private void writePage() {
+  private void writePageToPageBuffer() {
     try {
       pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer);
 
@@ -241,7 +241,7 @@ public class ChunkWriterImpl implements IChunkWriter {
   @Override
   public void sealCurrentPage() {
     if (pageWriter.getPointNumber() > 0) {
-      writePage();
+      writePageToPageBuffer();
     }
   }
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index b7f1905..e477b54 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -258,15 +258,15 @@ public class PageWriter {
     header.serializeTo(pageBuffer);
 
     // write page content to temp PBAOS
-    try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
-      logger.debug("start to flush a page data into buffer, buffer position {} 
", pageBuffer.size());
-      if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+    logger.debug("start to flush a page data into buffer, buffer position {} 
", pageBuffer.size());
+    if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+      try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
         channel.write(pageData);
-      } else {
-        pageBuffer.write(compressedBytes, compressedPosition, compressedSize);
       }
-      logger.debug("start to flush a page data into buffer, buffer position {} 
", pageBuffer.size());
+    } else {
+      pageBuffer.write(compressedBytes, compressedPosition, compressedSize);
     }
+    logger.debug("start to flush a page data into buffer, buffer position {} 
", pageBuffer.size());
   }
 
   /**

Reply via email to