This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c526024 [IOTDB-514] Extend aggregation (#833)
c526024 is described below
commit c526024ecd60c347544435c8c2178042c33b591c
Author: 张宇欣 <[email protected]>
AuthorDate: Tue Feb 25 09:01:30 2020 +0800
[IOTDB-514] Extend aggregation (#833)
* add merge function to AggregateResult and add timestamp attribute for
FirstValueAggrResult and LastValueAggrResult.
---
.../SystemDesign/1-TsFile/3-Write.md | 8 +-
.../db/query/aggregation/AggregateResult.java | 6 +
.../db/query/aggregation/impl/AvgAggrResult.java | 6 +
.../db/query/aggregation/impl/CountAggrResult.java | 8 +-
.../aggregation/impl/FirstValueAggrResult.java | 22 ++-
.../aggregation/impl/LastValueAggrResult.java | 23 ++-
.../query/aggregation/impl/MaxTimeAggrResult.java | 8 +
.../query/aggregation/impl/MaxValueAggrResult.java | 5 +
.../query/aggregation/impl/MinTimeAggrResult.java | 8 +
.../query/aggregation/impl/MinValueAggrResult.java | 8 +
.../db/query/aggregation/impl/SumAggrResult.java | 6 +
.../db/query/aggregation/AggregateResultTest.java | 183 +++++++++++++++++++++
.../iotdb/db/query/component/SimpleFileWriter.java | 54 ------
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 14 +-
.../apache/iotdb/tsfile/write/page/PageWriter.java | 12 +-
15 files changed, 290 insertions(+), 81 deletions(-)
diff --git a/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
b/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
index fe2fd7f..7a27f66 100644
--- a/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
+++ b/docs/Documentation-CHN/SystemDesign/1-TsFile/3-Write.md
@@ -25,7 +25,7 @@
TsFile 的写入流程如下图所示:
-<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto;
margin-right:auto; display:block;"
src="https://user-images.githubusercontent.com/19167280/73625238-efba2980-467e-11ea-927e-a7021f8153af.png">
+< img style="width:100%; max-width:800px; max-height:600px; margin-left:auto;
margin-right:auto; display:block;"
src="https://user-images.githubusercontent.com/19167280/73625238-efba2980-467e-11ea-927e-a7021f8153af.png">
其中,每个设备对应一个 ChunkGroupWriter,每个传感器对应一个 ChunkWriter。
@@ -41,11 +41,11 @@ TsFile 文件层的写入接口有两种
* TsFileWriter.write(TSRecord record)
- 写入一个设备一个时间戳多个测点。
+ 写入一个设备一个时间戳多个测点。
* TsFileWriter.write(RowBatch rowBatch)
- 写入一个设备多个时间戳多个测点。
+ 写入一个设备多个时间戳多个测点。
当调用 write 接口时,这个设备的数据会交给对应的 ChunkGroupWriter,其中的每个测点会交给对应的 ChunkWriter
进行写入。ChunkWriter 完成编码和打包(生成 Page)。
@@ -62,4 +62,4 @@ TsFile 文件层的写入接口有两种
* TsFileWriter.close()
-根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部,最后关闭文件。
+根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部,最后关闭文件。
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7ada955..e24606b 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -94,6 +94,11 @@ public abstract class AggregateResult {
*/
public abstract boolean isCalculatedAggregationResult();
+ /**
+ * Merge another aggregateResult into this
+ */
+ public abstract void merge(AggregateResult another);
+
public void reset() {
hasResult = false;
}
@@ -209,4 +214,5 @@ public abstract class AggregateResult {
protected boolean hasResult() {
return hasResult;
}
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index cec12fd..dbc677e 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -112,4 +112,10 @@ public class AvgAggrResult extends AggregateResult {
public boolean isCalculatedAggregationResult() {
return false;
}
+
+ @Override
+ public void merge(AggregateResult another) {
+ AvgAggrResult anotherAvg = (AvgAggrResult) another;
+ avg = (avg * cnt + anotherAvg.avg * anotherAvg.cnt) / (cnt +
anotherAvg.cnt);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 0e855da..34ffabb 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
-public class CountAggrResult extends AggregateResult {
+ public class CountAggrResult extends AggregateResult {
public CountAggrResult() {
super(TSDataType.INT64);
@@ -86,4 +86,10 @@ public class CountAggrResult extends AggregateResult {
public boolean isCalculatedAggregationResult() {
return false;
}
+
+ @Override
+ public void merge(AggregateResult another) {
+ CountAggrResult anotherCount = (CountAggrResult) another;
+ setLongValue(anotherCount.getResult() + this.getResult());
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index c86e078..e8f06fc 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
public class FirstValueAggrResult extends AggregateResult {
+ // timestamp of current value
+ private long timestamp = Long.MAX_VALUE;
+
public FirstValueAggrResult(TSDataType dataType) {
super(dataType);
reset();
@@ -40,17 +43,14 @@ public class FirstValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultFromStatistics(Statistics statistics)
- throws QueryProcessException {
+ public void updateResultFromStatistics(Statistics statistics) {
if (hasResult()) {
return;
}
Object firstVal = statistics.getFirstValue();
- if (firstVal == null) {
- throw new QueryProcessException("ChunkMetaData contains no FIRST value");
- }
setValue(firstVal);
+ timestamp = statistics.getStartTime();
}
@Override
@@ -60,6 +60,7 @@ public class FirstValueAggrResult extends AggregateResult {
}
if (dataInThisPage.hasCurrent()) {
setValue(dataInThisPage.currentValue());
+ timestamp = dataInThisPage.currentTime();
}
}
@@ -70,6 +71,7 @@ public class FirstValueAggrResult extends AggregateResult {
}
if (dataInThisPage.hasCurrent() && dataInThisPage.currentTime() < bound) {
setValue(dataInThisPage.currentValue());
+ timestamp = dataInThisPage.currentTime();
dataInThisPage.next();
}
}
@@ -85,6 +87,7 @@ public class FirstValueAggrResult extends AggregateResult {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
setValue(value);
+ timestamp = timestamps[i];
break;
}
}
@@ -94,4 +97,13 @@ public class FirstValueAggrResult extends AggregateResult {
public boolean isCalculatedAggregationResult() {
return hasResult();
}
+
+ @Override
+ public void merge(AggregateResult another) {
+ FirstValueAggrResult anotherFirst = (FirstValueAggrResult) another;
+ if(this.getValue() == null || this.timestamp > anotherFirst.timestamp){
+ this.setValue( anotherFirst.getValue() );
+ this.timestamp = anotherFirst.timestamp;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index f68bdf3..9154545 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -28,6 +28,9 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
public class LastValueAggrResult extends AggregateResult {
+ //timestamp of current value
+ private long timestamp = Long.MIN_VALUE;
+
public LastValueAggrResult(TSDataType dataType) {
super(dataType);
reset();
@@ -42,6 +45,7 @@ public class LastValueAggrResult extends AggregateResult {
public void updateResultFromStatistics(Statistics statistics) {
Object lastVal = statistics.getLastValue();
setValue(lastVal);
+ timestamp = statistics.getEndTime();
}
@Override
@@ -51,7 +55,7 @@ public class LastValueAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(BatchData dataInThisPage, long bound) {
- long time = -1;
+ long time = Long.MIN_VALUE;
Object lastVal = null;
while (dataInThisPage.hasCurrent() && dataInThisPage.currentTime() <
bound) {
time = dataInThisPage.currentTime();
@@ -59,8 +63,9 @@ public class LastValueAggrResult extends AggregateResult {
dataInThisPage.next();
}
- if (time != -1) {
+ if (time != Long.MIN_VALUE) {
setValue(lastVal);
+ timestamp = time;
}
}
@@ -68,7 +73,7 @@ public class LastValueAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(long[] timestamps, int length,
IReaderByTimestamp dataReader) throws IOException {
- long time = -1;
+ long time = Long.MIN_VALUE;
Object lastVal = null;
for (int i = 0; i < length; i++) {
Object value = dataReader.getValueInTimestamp(timestamps[i]);
@@ -77,8 +82,9 @@ public class LastValueAggrResult extends AggregateResult {
lastVal = value;
}
}
- if (time != -1) {
+ if (time != Long.MIN_VALUE) {
setValue(lastVal);
+ timestamp = time;
}
}
@@ -87,4 +93,13 @@ public class LastValueAggrResult extends AggregateResult {
return false;
}
+ @Override
+ public void merge(AggregateResult another) {
+ LastValueAggrResult anotherFirst = (LastValueAggrResult) another;
+ if(this.getValue() == null || this.timestamp < anotherFirst.timestamp){
+ this.setValue( anotherFirst.getValue() );
+ this.timestamp = anotherFirst.timestamp;
+ }
+ }
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 0b64fa7..530a1af 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -85,6 +85,14 @@ public class MaxTimeAggrResult extends AggregateResult {
return false;
}
+ @Override
+ public void merge(AggregateResult another) {
+ MaxTimeAggrResult anotherMaxTime = (MaxTimeAggrResult) another;
+ if (anotherMaxTime.getResult() != null) {
+ this.updateMaxTimeResult(anotherMaxTime.getResult());
+ }
+ }
+
private void updateMaxTimeResult(long value) {
if (!hasResult() || value >= getLongValue()) {
setLongValue(value);
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 8413ffc..ff360a2 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -83,6 +83,11 @@ public class MaxValueAggrResult extends AggregateResult {
return false;
}
+ @Override
+ public void merge(AggregateResult another) {
+ this.updateResult((Comparable<Object>)another.getResult());
+ }
+
private void updateResult(Comparable<Object> maxVal) {
if (maxVal == null) {
return;
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index d2346f7..5c3ac9d 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -82,4 +82,12 @@ public class MinTimeAggrResult extends AggregateResult {
return hasResult();
}
+ @Override
+ public void merge(AggregateResult another) {
+ MinTimeAggrResult anotherMinTime = (MinTimeAggrResult) another;
+ if (hasResult() && anotherMinTime.hasResult() && getResult() >
anotherMinTime.getResult()) {
+ setLongValue(anotherMinTime.getResult());
+ }
+ }
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 338c16f..525d860 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -78,6 +78,14 @@ public class MinValueAggrResult extends AggregateResult {
return false;
}
+ @Override
+ public void merge(AggregateResult another) {
+ if (another.getResult() != null) {
+ Object value = another.getResult();
+ this.updateResult((Comparable<Object>) value);
+ }
+ }
+
private void updateResult(Comparable<Object> minVal) {
if (minVal == null) {
return;
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index bc1fbec..f21fe04 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -104,4 +104,10 @@ public class SumAggrResult extends AggregateResult {
public boolean isCalculatedAggregationResult() {
return false;
}
+
+ @Override
+ public void merge(AggregateResult another) {
+ SumAggrResult anotherSum = (SumAggrResult) another;
+ setDoubleValue(getDoubleValue() + anotherSum.getDoubleValue());
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
new file mode 100644
index 0000000..698b943
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.factory.AggreResultFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AggregateResultTest {
+
+ @Test
+ public void avgAggrResultTest() throws QueryProcessException {
+ AggregateResult avgAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.AVG, TSDataType.DOUBLE);
+ AggregateResult avgAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.AVG, TSDataType.DOUBLE);
+
+ Statistics statistics = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics.update(1l,1d);
+
+ avgAggrResult1.updateResultFromStatistics(statistics);
+ avgAggrResult2.updateResultFromStatistics(statistics);
+ avgAggrResult1.merge(avgAggrResult2);
+ Assert.assertEquals(1d, (double)avgAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void maxValueAggrResultTest() throws QueryProcessException {
+ AggregateResult maxValueAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_VALUE,
TSDataType.DOUBLE);
+ AggregateResult maxValueAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_VALUE,
TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(1l,2d);
+
+ maxValueAggrResult1.updateResultFromStatistics(statistics1);
+ maxValueAggrResult2.updateResultFromStatistics(statistics2);
+ maxValueAggrResult1.merge(maxValueAggrResult2);
+
+ Assert.assertEquals(2d, (double)maxValueAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void maxTimeAggrResultTest() throws QueryProcessException {
+ AggregateResult maxTimeAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.DOUBLE);
+ AggregateResult maxTimeAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(2l,1d);
+
+ maxTimeAggrResult1.updateResultFromStatistics(statistics1);
+ maxTimeAggrResult2.updateResultFromStatistics(statistics2);
+ maxTimeAggrResult1.merge(maxTimeAggrResult2);
+
+ Assert.assertEquals(2l, (long)maxTimeAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void minValueAggrResultTest() throws QueryProcessException {
+ AggregateResult minValueAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_VALUE,
TSDataType.DOUBLE);
+ AggregateResult minValueAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_VALUE,
TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(1l,2d);
+
+ minValueAggrResult1.updateResultFromStatistics(statistics1);
+ minValueAggrResult2.updateResultFromStatistics(statistics2);
+ minValueAggrResult1.merge(minValueAggrResult2);
+
+ Assert.assertEquals(1d, (double)minValueAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void minTimeAggrResultTest() throws QueryProcessException {
+ AggregateResult minTimeAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.DOUBLE);
+ AggregateResult minTimeAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(2l,1d);
+
+ minTimeAggrResult1.updateResultFromStatistics(statistics1);
+ minTimeAggrResult2.updateResultFromStatistics(statistics2);
+ minTimeAggrResult1.merge(minTimeAggrResult2);
+
+ Assert.assertEquals(1l, (long)minTimeAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void countAggrResultTest() throws QueryProcessException {
+ AggregateResult countAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.COUNT, TSDataType.INT64);
+ AggregateResult countAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.COUNT, TSDataType.INT64);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.INT64);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.INT64);
+ statistics1.update(1l, 1l);
+ statistics2.update(1l,1l);
+
+ countAggrResult1.updateResultFromStatistics(statistics1);
+ countAggrResult2.updateResultFromStatistics(statistics2);
+ countAggrResult1.merge(countAggrResult2);
+
+ Assert.assertEquals(2, (long)countAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void sumAggrResultTest() throws QueryProcessException {
+ AggregateResult sumAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.SUM, TSDataType.DOUBLE);
+ AggregateResult sumAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.SUM, TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(1l,2d);
+
+ sumAggrResult1.updateResultFromStatistics(statistics1);
+ sumAggrResult2.updateResultFromStatistics(statistics2);
+ sumAggrResult1.merge(sumAggrResult2);
+
+ Assert.assertEquals(3d, (double)sumAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void firstValueAggrResultTest() throws QueryProcessException {
+ AggregateResult firstValueAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.FIRST_VALUE,
TSDataType.DOUBLE);
+ AggregateResult firstValueAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.FIRST_VALUE,
TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(2l,2d);
+
+ firstValueAggrResult1.updateResultFromStatistics(statistics1);
+ firstValueAggrResult2.updateResultFromStatistics(statistics2);
+ firstValueAggrResult1.merge(firstValueAggrResult2);
+
+ Assert.assertEquals(1d, (double)firstValueAggrResult1.getResult(), 0.01);
+ }
+
+ @Test
+ public void lastValueAggrResultTest() throws QueryProcessException {
+ AggregateResult lastValueAggrResult1 =
AggreResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE,
TSDataType.DOUBLE);
+ AggregateResult lastValueAggrResult2 =
AggreResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE,
TSDataType.DOUBLE);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1l, 1d);
+ statistics2.update(2l,2d);
+
+ lastValueAggrResult1.updateResultFromStatistics(statistics1);
+ lastValueAggrResult2.updateResultFromStatistics(statistics2);
+ lastValueAggrResult1.merge(lastValueAggrResult2);
+
+ Assert.assertEquals(2d, (double)lastValueAggrResult1.getResult(), 0.01);
+ }
+
+
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/query/component/SimpleFileWriter.java
b/server/src/test/java/org/apache/iotdb/db/query/component/SimpleFileWriter.java
deleted file mode 100644
index d7e114f..0000000
---
a/server/src/test/java/org/apache/iotdb/db/query/component/SimpleFileWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.component;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.junit.Ignore;
-
-@Ignore
-public class SimpleFileWriter {
-
- public static void writeFile(String path, byte[] bytes) throws IOException {
- File file = new File(path);
- if (file.exists()) {
- file.delete();
- }
- file.createNewFile();
- FileOutputStream fileOutputStream = new FileOutputStream(path);
- fileOutputStream.write(bytes, 0, bytes.length);
- fileOutputStream.close();
- }
-
- public static void writeFile(int size, String path) throws IOException {
- File file = new File(path);
- if (file.exists()) {
- file.delete();
- }
- file.createNewFile();
- byte[] bytes = new byte[size];
- for (int i = 0; i < size; i++) {
- bytes[i] = (byte) (i % 200 + 1);
- }
- FileOutputStream fileOutputStream = new FileOutputStream(path);
- fileOutputStream.write(bytes, 0, size);
- fileOutputStream.close();
- }
-}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 2fb20f8..3c6a837 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -46,7 +46,7 @@ public class ChunkWriterImpl implements IChunkWriter {
private ICompressor compressor;
/**
- * all pages of this column.
+ * all pages of this chunk.
*/
private PublicBAOS pageBuffer;
@@ -172,13 +172,13 @@ public class ChunkWriterImpl implements IChunkWriter {
}
/**
- * check occupied memory size, if it exceeds the PageSize threshold, flush
them to given
- * OutputStream.
+ * check occupied memory size, if it exceeds the PageSize threshold,
construct a page and
+ * put it to pageBuffer
*/
private void checkPageSizeAndMayOpenANewPage() {
if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
logger.debug("current line count reaches the upper bound, write page
{}", measurementSchema);
- writePage();
+ writePageToPageBuffer();
} else if (pageWriter.getPointNumber()
>= valueCountInOnePageForNextCheck) { // need to check memory size
// not checking the memory used for every value
@@ -189,7 +189,7 @@ public class ChunkWriterImpl implements IChunkWriter {
"enough size, write page {}, pageSizeThreshold:{},
currentPateSize:{}, valueCountInOnePage:{}",
measurementSchema.getMeasurementId(), pageSizeThreshold,
currentPageSize,
pageWriter.getPointNumber());
- writePage();
+ writePageToPageBuffer();
valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
} else {
// reset the valueCountInOnePageForNextCheck for the next page
@@ -199,7 +199,7 @@ public class ChunkWriterImpl implements IChunkWriter {
}
}
- private void writePage() {
+ private void writePageToPageBuffer() {
try {
pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer);
@@ -241,7 +241,7 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void sealCurrentPage() {
if (pageWriter.getPointNumber() > 0) {
- writePage();
+ writePageToPageBuffer();
}
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index b7f1905..e477b54 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -258,15 +258,15 @@ public class PageWriter {
header.serializeTo(pageBuffer);
// write page content to temp PBAOS
- try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
- logger.debug("start to flush a page data into buffer, buffer position {}
", pageBuffer.size());
- if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+ logger.debug("start to flush a page data into buffer, buffer position {}
", pageBuffer.size());
+ if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
channel.write(pageData);
- } else {
- pageBuffer.write(compressedBytes, compressedPosition, compressedSize);
}
- logger.debug("start to flush a page data into buffer, buffer position {}
", pageBuffer.size());
+ } else {
+ pageBuffer.write(compressedBytes, compressedPosition, compressedSize);
}
+ logger.debug("start to flush a page data into buffer, buffer position {}
", pageBuffer.size());
}
/**