This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new a6e71f5 modify for pr review suggestion
a6e71f5 is described below
commit a6e71f5b907f6501a0b72f6a264c4b9ba214bd1b
Author: suyue <[email protected]>
AuthorDate: Fri Mar 15 23:19:10 2019 +0800
modify for pr review suggestion
---
.../db/query/aggregation/AggreFuncFactory.java | 4 +-
.../db/query/aggregation/AggregateFunction.java | 64 +++++-----
.../db/query/aggregation/impl/CountAggrFunc.java | 27 +++-
.../db/query/aggregation/impl/FirstAggrFunc.java | 18 +++
.../db/query/aggregation/impl/LastAggrFunc.java | 22 +++-
.../db/query/aggregation/impl/MaxTimeAggrFunc.java | 41 ++++--
.../query/aggregation/impl/MaxValueAggrFunc.java | 32 +++--
.../db/query/aggregation/impl/MeanAggrFunc.java | 21 +++-
.../db/query/aggregation/impl/MinTimeAggrFunc.java | 17 +++
.../query/aggregation/impl/MinValueAggrFunc.java | 21 +++-
.../db/query/aggregation/impl/SumAggrFunc.java | 102 +--------------
.../iotdb/db/query/dataset/AggregateDataSet.java | 138 ---------------------
.../db/query/dataset/BatchDataPointReader.java | 54 ++++++++
.../db/query/executor/AggregateEngineExecutor.java | 38 +++---
.../iotdb/db/query/executor/EngineQueryRouter.java | 3 +-
.../iotdb/db/query/reader/AllDataReader.java | 2 +-
.../query/reader/sequence/SealedTsFilesReader.java | 2 +-
.../db/integration/IoTDBAggregationTestIT.java | 129 ++++++++++++++++++-
.../iotdb/db/query/reader/AllDataReaderTest.java | 2 -
19 files changed, 408 insertions(+), 329 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
index a1db207..d33fb7a 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
@@ -61,11 +61,11 @@ public class AggreFuncFactory {
case AggregationConstant.COUNT:
return new CountAggrFunc();
case AggregationConstant.MEAN:
- return new MeanAggrFunc(dataType);
+ return new MeanAggrFunc(AggregationConstant.MEAN, dataType);
case AggregationConstant.FIRST:
return new FirstAggrFunc(dataType);
case AggregationConstant.SUM:
- return new SumAggrFunc(dataType);
+ return new SumAggrFunc(AggregationConstant.SUM, dataType);
case AggregationConstant.LAST:
return new LastAggrFunc(dataType);
default:
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index ba96f5a..5981b58 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -24,17 +24,15 @@ import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.utils.Binary;
public abstract class AggregateFunction {
- public String name;
- public BatchData resultData;
- public TSDataType dataType;
+ protected String name;
+ protected BatchData resultData;
+ protected TSDataType dataType;
/**
* construct.
@@ -76,9 +74,28 @@ public abstract class AggregateFunction {
public abstract void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader) throws IOException, ProcessorException;
+
+ /**
+ * <p>
+ * Calculate the aggregation with data in unsequenceReader.
+ * </p>
+ *
+ * @param unsequenceReader unsequence data reader
+ */
public abstract void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader)
throws IOException, ProcessorException;
+ /**
+ * <p>
+ * Calculate the aggregation with data whose timestamp is less than bound
in unsequenceReader.
+ * </p>
+ *
+ * @param unsequenceReader unsequence data reader
+ * @param bound the time upper bounder of data in unsequence data reader
+ * @throws IOException TsFile data read exception
+ */
+ public abstract void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException, ProcessorException;
/**
* <p>
@@ -92,6 +109,12 @@ public abstract class AggregateFunction {
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException;
/**
+ * Judge if aggregation results have been calculated.
+ * @return If the aggregation result has been calculated return true, else
return false.
+ */
+ public abstract boolean isCalculatedAggregationResult();
+
+ /**
* <p>
* This method is calculate the group by function.
* </p>
@@ -100,35 +123,4 @@ public abstract class AggregateFunction {
long intervalStart, long intervalEnd,
BatchData data) throws ProcessorException;
- /**
- * Convert a value from string to its real data type and put into return
data.
- */
- public void putValueFromStr(String valueStr) throws ProcessorException {
- try {
- switch (dataType) {
- case INT32:
- resultData.putInt(Integer.parseInt(valueStr));
- break;
- case INT64:
- resultData.putLong(Long.parseLong(valueStr));
- break;
- case BOOLEAN:
- resultData.putBoolean(Boolean.parseBoolean(valueStr));
- break;
- case TEXT:
- resultData.putBinary(new Binary(valueStr));
- break;
- case DOUBLE:
- resultData.putDouble(Double.parseDouble(valueStr));
- break;
- case FLOAT:
- resultData.putFloat(Float.parseFloat(valueStr));
- break;
- default:
- throw new ProcessorException("Unsupported type " + dataType);
- }
- } catch (Exception e) {
- throw new ProcessorException(e.getMessage());
- }
- }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 9c70142..2967495 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -30,9 +30,13 @@ import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CountAggrFunc extends AggregateFunction {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CountAggrFunc.class);
+
public CountAggrFunc() {
super(AggregationConstant.COUNT, TSDataType.INT64);
}
@@ -52,9 +56,8 @@ public class CountAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) {
- System.out.println("PageHeader>>>>>>>>>>>>" + pageHeader.getNumOfValues()
+ " " + pageHeader
- .getMinTimestamp()
- + "," + pageHeader.getMaxTimestamp());
+ LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{},
maxTimeStamp{}",
+ pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(),
pageHeader.getMaxTimestamp());
long preValue = resultData.getLong();
preValue += pageHeader.getNumOfValues();
resultData.setLong(0, preValue);
@@ -99,6 +102,19 @@ public class CountAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException {
+ int cnt = 0;
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ unsequenceReader.next();
+ cnt++;
+ }
+ long preValue = resultData.getLong();
+ preValue += cnt;
+ resultData.setLong(0, preValue);
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
int cnt = 0;
@@ -115,6 +131,11 @@ public class CountAggrFunc extends AggregateFunction {
}
@Override
+ public boolean isCalculatedAggregationResult() {
+ return false;
+ }
+
+ @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index fa6e717..a2e4297 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -101,6 +101,19 @@ public class FirstAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException {
+ if (resultData.length() != 0) {
+ return;
+ }
+ if (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ resultData.putTime(0);
+ resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+ return;
+ }
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
if (resultData.length() != 0) {
@@ -118,6 +131,11 @@ public class FirstAggrFunc extends AggregateFunction {
}
@Override
+ public boolean isCalculatedAggregationResult() {
+ return resultData.length() != 0;
+ }
+
+ @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index 71142a3..4630a58 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -100,6 +100,19 @@ public class LastAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException {
+ TimeValuePair pair = null;
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ pair = unsequenceReader.next();
+ }
+
+ if (pair != null) {
+ updateLastResult(pair.getTimestamp(), pair.getValue().getValue());
+ }
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
@@ -112,12 +125,17 @@ public class LastAggrFunc extends AggregateFunction {
lastVal = value.getValue();
}
}
- if(time != -1){
- updateLastResult(-1, lastVal);
+ if (time != -1) {
+ updateLastResult(time, lastVal);
}
}
@Override
+ public boolean isCalculatedAggregationResult() {
+ return false;
+ }
+
+ @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index cd09d5d..e47aaf7 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -73,13 +73,6 @@ public class MaxTimeAggrFunc extends AggregateFunction {
return;
}
time = dataInThisPage.getTimeByIndex(maxIndex);
- while (unsequenceReader.hasNext()) {
- if (unsequenceReader.current().getTimestamp() <= time) {
- unsequenceReader.next();
- } else {
- break;
- }
- }
if (resultData.length() == 0) {
if (time != -1) {
resultData.putTime(0);
@@ -87,7 +80,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
}
} else {
//has set value
- if (time != -1 && time > resultData.currentTime()) {
+ if (time != -1 && time > resultData.getLong()) {
resultData.setAnObject(0, time);
}
}
@@ -107,12 +100,33 @@ public class MaxTimeAggrFunc extends AggregateFunction {
}
} else {
//has set value
- if (pair != null && pair.getTimestamp() > resultData.currentTime()) {
+ if (pair != null && pair.getTimestamp() > resultData.getLong()) {
+ resultData.setAnObject(0, pair.getTimestamp());
+ }
+ }
+ }
+
+ @Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException {
+ TimeValuePair pair = null;
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ pair = unsequenceReader.next();
+ }
+ if (resultData.length() == 0) {
+ if (pair != null) {
+ resultData.putTime(0);
+ resultData.putAnObject(pair.getTimestamp());
+ }
+ } else {
+ //has set value
+ if (pair != null && pair.getTimestamp() > resultData.getLong()) {
resultData.setAnObject(0, pair.getTimestamp());
}
}
}
+ //TODO Consider how to reverse order in dataReader(EngineReaderByTimeStamp)
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
@@ -124,7 +138,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
}
}
- if(time == -1){
+ if (time == -1) {
return;
}
@@ -132,13 +146,18 @@ public class MaxTimeAggrFunc extends AggregateFunction {
resultData.putTime(0);
resultData.putLong(time);
} else {
- if(resultData.getLong() < time){
+ if (resultData.getLong() < time) {
resultData.setLong(0, time);
}
}
}
@Override
+ public boolean isCalculatedAggregationResult() {
+ return false;
+ }
+
+ @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index 6973485..98d8d83 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -53,7 +52,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
Comparable<Object> maxVal = (Comparable<Object>)
pageHeader.getStatistics().getMax();
if (resultData.length() == 0) {
resultData.putTime(0);
- resultData.putAnObject( maxVal);
+ resultData.putAnObject(maxVal);
} else {
if (maxVal.compareTo(resultData.currentValue()) > 0) {
resultData.setAnObject(0, maxVal);
@@ -111,30 +110,49 @@ public class MaxValueAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException {
+ Comparable<Object> maxVal = null;
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ if (maxVal == null
+ ||
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+ maxVal = (Comparable<Object>)
unsequenceReader.current().getValue().getValue();
+ }
+ unsequenceReader.next();
+ }
+ updateResult(maxVal);
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
Comparable<Object> maxVal = null;
- for (long time : timestamps){
+ for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
- if(value == null){
+ if (value == null) {
continue;
}
- if(maxVal == null || maxVal.compareTo(value.getValue())<0){
+ if (maxVal == null || maxVal.compareTo(value.getValue()) < 0) {
maxVal = (Comparable<Object>) value.getValue();
}
}
updateResult(maxVal);
}
+ @Override
+ public boolean isCalculatedAggregationResult() {
+ return false;
+ }
+
private void updateResult(Comparable<Object> maxVal) {
if (resultData.length() == 0) {
if (maxVal != null) {
resultData.putTime(0);
- resultData.putAnObject( maxVal);
+ resultData.putAnObject(maxVal);
}
} else {
if (maxVal != null && maxVal.compareTo(resultData.currentValue()) > 0) {
- resultData.setAnObject( 0, maxVal);
+ resultData.setAnObject(0, maxVal);
}
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index 202f706..6a17285 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
-import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.utils.TimeValuePair;
@@ -34,12 +33,12 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
public class MeanAggrFunc extends AggregateFunction {
- private double sum = 0.0;
+ protected double sum = 0.0;
private int cnt = 0;
private TSDataType seriesDataType;
- public MeanAggrFunc(TSDataType seriesDataType) {
- super(AggregationConstant.MEAN, TSDataType.DOUBLE);
+ public MeanAggrFunc(String name, TSDataType seriesDataType) {
+ super(name, TSDataType.DOUBLE);
this.seriesDataType = seriesDataType;
}
@@ -119,6 +118,15 @@ public class MeanAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException, ProcessorException {
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ TimeValuePair pair = unsequenceReader.next();
+ updateMean(seriesDataType, pair.getValue().getValue());
+ }
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
for (long time : timestamps) {
@@ -129,6 +137,11 @@ public class MeanAggrFunc extends AggregateFunction {
}
}
+ @Override
+ public boolean isCalculatedAggregationResult() {
+ return false;
+ }
+
@Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index ad87adb..7d8afd7 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -94,6 +94,18 @@ public class MinTimeAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException, ProcessorException {
+ if (resultData.length() > 0) {
+ return;
+ }
+ if (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ resultData.putTime(0);
+ resultData.putLong(unsequenceReader.current().getTimestamp());
+ }
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
if (resultData.length() > 0) {
@@ -110,6 +122,11 @@ public class MinTimeAggrFunc extends AggregateFunction {
}
@Override
+ public boolean isCalculatedAggregationResult() {
+ return resultData.length() > 0;
+ }
+
+ @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index 82cd753..ed030f1 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -111,12 +111,26 @@ public class MinValueAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
+ throws IOException, ProcessorException {
+ Comparable<Object> minVal = null;
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
+ if (minVal == null
+ ||
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+ minVal = (Comparable<Object>)
unsequenceReader.current().getValue().getValue();
+ }
+ unsequenceReader.next();
+ }
+ updateResult(minVal);
+ }
+
+ @Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
Comparable<Object> minVal = null;
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
- if(value == null){
+ if (value == null) {
continue;
}
if (minVal == null || minVal.compareTo(value.getValue()) > 0) {
@@ -126,6 +140,11 @@ public class MinValueAggrFunc extends AggregateFunction {
updateResult(minVal);
}
+ @Override
+ public boolean isCalculatedAggregationResult() {
+ return false;
+ }
+
private void updateResult(Comparable<Object> minVal) {
if (resultData.length() == 0) {
if (minVal != null) {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
index 77a347c..2f2eb2d 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
@@ -19,32 +19,13 @@
package org.apache.iotdb.db.query.aggregation.impl;
-import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.query.aggregation.AggregateFunction;
-import org.apache.iotdb.db.query.aggregation.AggregationConstant;
-import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TsPrimitiveType;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
-public class SumAggrFunc extends AggregateFunction {
-
- private double sum = 0.0;
- private TSDataType seriesDataType;
-
- public SumAggrFunc(TSDataType seriesDataType) {
- super(AggregationConstant.SUM, TSDataType.DOUBLE);
- this.seriesDataType = seriesDataType;
- }
-
- @Override
- public void init() {
+public class SumAggrFunc extends MeanAggrFunc {
+ public SumAggrFunc(String name, TSDataType seriesDataType) {
+ super(name, seriesDataType);
}
@Override
@@ -53,81 +34,4 @@ public class SumAggrFunc extends AggregateFunction {
resultData.putTime(0);
return resultData;
}
-
- @Override
- public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
- sum += pageHeader.getStatistics().getSum();
- }
-
- @Override
- public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
- throws IOException, ProcessorException {
- while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
- Object sumVal = null;
- if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
- sumVal = dataInThisPage.currentValue();
- dataInThisPage.next();
- } else if (dataInThisPage.currentTime() ==
unsequenceReader.current().getTimestamp()) {
- sumVal = unsequenceReader.current().getValue().getValue();
- dataInThisPage.next();
- unsequenceReader.next();
- } else {
- sumVal = unsequenceReader.current().getValue().getValue();
- unsequenceReader.next();
- }
- updateSum(seriesDataType, sumVal);
- }
-
- while (dataInThisPage.hasNext()) {
- updateSum(seriesDataType, dataInThisPage.currentValue());
- dataInThisPage.next();
- }
- }
-
- private void updateSum(TSDataType type, Object sumVal) throws
ProcessorException {
- switch (type) {
- case INT32:
- sum += (int) sumVal;
- break;
- case INT64:
- sum += (long) sumVal;
- break;
- case FLOAT:
- sum += (float) sumVal;
- break;
- case DOUBLE:
- sum += (double) sumVal;
- break;
- case TEXT:
- case BOOLEAN:
- default:
- throw new ProcessorException("Unsupported data type in aggregation
MEAN : " + type);
- }
- }
-
- @Override
- public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
- throws IOException, ProcessorException {
- while (unsequenceReader.hasNext()) {
- TimeValuePair pair = unsequenceReader.next();
- updateSum(seriesDataType, pair.getValue().getValue());
- }
- }
-
- @Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
- if (value != null) {
- updateSum(seriesDataType, value.getValue());
- }
- }
- }
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
- }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java
deleted file mode 100644
index 00fe608..0000000
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.dataset;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.PriorityQueue;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-
-public class AggregateDataSet extends QueryDataSet {
- private List<BatchData> readers;
-
- private PriorityQueue<Long> timeHeap;
-
- private List<String> aggres;
-
- /**
- * constructor of EngineDataSetWithoutTimeGenerator.
- *
- * @param paths paths in List structure
- * @param aggres aggregate function name
- * @param dataTypes time series data type
- * @param readers readers in List(IReader) structure
- * @throws IOException IOException
- */
- public AggregateDataSet(List<Path> paths, List<String> aggres,
List<TSDataType> dataTypes,
- List<BatchData> readers)
- throws IOException {
- super(paths, dataTypes);
- this.readers = readers;
- initHeap();
- }
-
- private void initHeap(){
- timeHeap = new PriorityQueue<>();
-
- for (int i = 0; i < readers.size(); i++) {
- BatchData reader = readers.get(i);
- if (reader.hasNext()) {
- timeHeap.add(reader.currentTime());
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return !timeHeap.isEmpty();
- }
-
- @Override
- public RowRecord next() throws IOException {
- long minTime = timeHeapGet();
-
- RowRecord record = new RowRecord(minTime);
-
- for (int i = 0; i < readers.size(); i++) {
- BatchData reader = readers.get(i);
- if (!reader.hasNext()) {
- record.addField(new Field(null));
- } else {
- if (reader.currentTime() == minTime) {
- record.addField(getField(reader, dataTypes.get(i)));
- reader.next();
- if (reader.hasNext()) {
- timeHeap.add(reader.currentTime());
- }
- } else {
- record.addField(new Field(null));
- }
- }
- }
-
- return record;
- }
-
- private Field getField(BatchData batchData, TSDataType dataType) {
- Field field = new Field(dataType);
- switch (dataType) {
- case INT32:
- field.setIntV(batchData.getInt());
- break;
- case INT64:
- field.setLongV(batchData.getLong());
- break;
- case FLOAT:
- field.setFloatV(batchData.getFloat());
- break;
- case DOUBLE:
- field.setDoubleV(batchData.getDouble());
- break;
- case BOOLEAN:
- field.setBoolV(batchData.getBoolean());
- break;
- case TEXT:
- field.setBinaryV(batchData.getBinary());
- break;
- default:
- throw new UnSupportedDataTypeException("UnSupported: " + dataType);
- }
- return field;
- }
-
- private Long timeHeapGet() {
- Long t = timeHeap.peek();
- while (!timeHeap.isEmpty()){
- if(timeHeap.peek() == t){
- timeHeap.poll();
- }
- else {
- break;
- }
- }
- return t;
- }
-}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
new file mode 100644
index 0000000..2fbf4a1
--- /dev/null
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the
License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing
permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TimeValuePairUtils;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class BatchDataPointReader implements IPointReader {
+
+ private BatchData batchData;
+
+ public BatchDataPointReader(BatchData batchData) {
+ this.batchData = batchData;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return batchData.hasNext();
+ }
+
+ @Override
+ public TimeValuePair next() throws IOException {
+ TimeValuePair timeValuePair =
TimeValuePairUtils.getCurrentTimeValuePair(batchData);
+ batchData.next();
+ return timeValuePair;
+ }
+
+ @Override
+ public TimeValuePair current() throws IOException {
+ TimeValuePair timeValuePair =
TimeValuePairUtils.getCurrentTimeValuePair(batchData);
+ return timeValuePair;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //batch data doesn't need to close.
+ }
+}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 21eaf47..d9cf6ec 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -35,7 +35,8 @@ import
org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
-import org.apache.iotdb.db.query.dataset.AggregateDataSet;
+import org.apache.iotdb.db.query.dataset.BatchDataPointReader;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -82,7 +83,7 @@ public class AggregateEngineExecutor {
*
* @param context query context
*/
- public AggregateDataSet executeWithOutTimeGenerator(QueryContext context)
+ public QueryDataSet executeWithOutTimeGenerator(QueryContext context)
throws FileNodeManagerException, IOException, PathErrorException,
ProcessorException {
Filter timeFilter = null;
if (expression != null) {
@@ -141,7 +142,7 @@ public class AggregateEngineExecutor {
while (sequenceReader.hasNext()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
//judge if overlap with unsequence data
- if (canUseHeader(pageHeader, unSequenceReader)) {
+ if (canUseHeader(function, pageHeader, unSequenceReader)) {
//cal by pageHeader
function.calculateValueFromPageHeader(pageHeader);
sequenceReader.skipPageData();
@@ -149,6 +150,10 @@ public class AggregateEngineExecutor {
//cal by pageData
function.calculateValueFromPageData(sequenceReader.nextBatch(),
unSequenceReader);
}
+
+ if (function.isCalculatedAggregationResult()) {
+ return function.getResult();
+ }
}
//cal with unsequence data
@@ -158,8 +163,9 @@ public class AggregateEngineExecutor {
return function.getResult();
}
- private boolean canUseHeader(PageHeader pageHeader, IPointReader
unSequenceReader)
- throws IOException {
+ private boolean canUseHeader(AggregateFunction function, PageHeader
pageHeader,
+ IPointReader unSequenceReader)
+ throws IOException, ProcessorException {
//if page data is memory data.
if (pageHeader == null) {
return false;
@@ -167,11 +173,12 @@ public class AggregateEngineExecutor {
long minTime = pageHeader.getMinTimestamp();
long maxTime = pageHeader.getMaxTimestamp();
- while (unSequenceReader.hasNext() &&
unSequenceReader.current().getTimestamp() <= maxTime) {
- if (minTime <= unSequenceReader.current().getTimestamp()) {
- return false;
- }
- unSequenceReader.next();
+
+ //cal unsequence data with timestamps between pages.
+ function.calculateValueFromUnsequenceReader(unSequenceReader, minTime);
+
+ if (unSequenceReader.hasNext() &&
unSequenceReader.current().getTimestamp() <= maxTime) {
+ return false;
}
return true;
}
@@ -192,7 +199,7 @@ public class AggregateEngineExecutor {
while (sequenceReader.hasNext()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
//judge if overlap with unsequence data
- if (canUseHeader(pageHeader, unSequenceReader)) {
+ if (canUseHeader(function, pageHeader, unSequenceReader)) {
//cal by pageHeader
function.calculateValueFromPageHeader(pageHeader);
sequenceReader.skipPageData();
@@ -229,9 +236,10 @@ public class AggregateEngineExecutor {
/**
* execute aggregate function with value filter.
+ *
* @param context query context.
*/
- public AggregateDataSet executeWithTimeGenerator(QueryContext context)
+ public QueryDataSet executeWithTimeGenerator(QueryContext context)
throws FileNodeManagerException, PathErrorException, IOException,
ProcessorException {
QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId,
expression);
@@ -319,12 +327,14 @@ public class AggregateEngineExecutor {
return readersOfSelectedSeries;
}
- private AggregateDataSet constructDataSet(List<BatchData> batchDataList)
throws IOException {
+ private QueryDataSet constructDataSet(List<BatchData> batchDataList) throws
IOException {
List<TSDataType> dataTypes = new ArrayList<>();
+ List<IPointReader> batchDataPointReaders = new ArrayList<>();
for (BatchData batchData : batchDataList) {
dataTypes.add(batchData.getDataType());
+ batchDataPointReaders.add(new BatchDataPointReader(batchData));
}
- return new AggregateDataSet(selectedSeries, aggres, dataTypes,
batchDataList);
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes,
batchDataPointReaders);
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index b5a996f..c36d5c2 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
-import org.apache.iotdb.db.query.dataset.AggregateDataSet;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -96,7 +95,7 @@ public class EngineQueryRouter {
/**
* execute aggregation query.
*/
- public AggregateDataSet aggregate(List<Path> selectedSeries, List<String>
aggres,
+ public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
IExpression expression)
throws QueryFilterOptimizationException, FileNodeManagerException,
IOException, PathErrorException, ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java
index d277c45..ff6cde3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java
@@ -72,7 +72,7 @@ public class AllDataReader implements IPointReader {
public TimeValuePair next() throws IOException {
//construct batchData, and compare with value in pointReader
while (hasCachedBatchData || batchReader.hasNext()) {
- //if batchData isn't initialization, then initialize it
+ //if batchData isn't initialized, then initialize it
if (!hasCachedBatchData) {
if (batchReader.hasNext()) {
batchData = batchReader.nextBatch();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index 61b2463..094cd0c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -173,7 +173,7 @@ public class SealedTsFilesReader implements IBatchReader,
IAggregateReader {
@Override
public PageHeader nextPageHeader() throws IOException {
- return null;
+ return seriesReader.nextPageHeader();
}
@Override
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
index 2d169e3..1e2906b 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
@@ -84,7 +84,8 @@ public class IoTDBAggregationTestIT {
@Test
public void countTest() throws SQLException {
String[] retArray = new String[]{
- "0,2001,2001,2001,2001"
+ "0,2001,2001,2001,2001",
+ "0,7500,7500,7500,7500"
};
Connection connection = null;
try {
@@ -105,6 +106,23 @@ public class IoTDBAggregationTestIT {
}
Assert.assertEquals(1, cnt);
statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select
count(s0),count(s1),count(s2),count(s3) " +
+ "from root.vehicle.d0");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(count(d0s0))
+ + "," + resultSet.getString(count(d0s1)) + "," +
resultSet.getString(count(d0s2))
+ + "," + resultSet.getString(count(d0s3));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -118,7 +136,8 @@ public class IoTDBAggregationTestIT {
@Test
public void firstTest() throws SQLException {
String[] retArray = new String[]{
- "0,2000,2000,2000.0,2000"
+ "0,2000,2000,2000.0,2000",
+ "0,500,500,500.0,500"
};
Connection connection = null;
try {
@@ -139,6 +158,23 @@ public class IoTDBAggregationTestIT {
}
Assert.assertEquals(1, cnt);
statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select
first(s0),first(s1),first(s2),first(s3) " +
+ "from root.vehicle.d0");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(first(d0s0))
+ + "," + resultSet.getString(first(d0s1)) + "," +
resultSet.getString(first(d0s2))
+ + "," + resultSet.getString(first(d0s3));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -152,7 +188,9 @@ public class IoTDBAggregationTestIT {
@Test
public void lastTest() throws SQLException {
String[] retArray = new String[]{
- "0,8499,8499.0"
+ "0,8499,8499.0",
+ "0,1499,1499.0",
+ "0,2200,2200.0"
};
Connection connection = null;
try {
@@ -172,6 +210,37 @@ public class IoTDBAggregationTestIT {
}
Assert.assertEquals(1, cnt);
statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select last(s0),last(s2) " +
+ "from root.vehicle.d0 where time <= 1600");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(last(d0s0))
+ + "," + resultSet.getString(last(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select last(s0),last(s2) " +
+ "from root.vehicle.d0 where time <= 2200");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(last(d0s0))
+ + "," + resultSet.getString(last(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(3, cnt);
+ statement.close();
+
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -185,7 +254,8 @@ public class IoTDBAggregationTestIT {
@Test
public void maxminTimeTest() throws SQLException {
String[] retArray = new String[]{
- "0,8499,500"
+ "0,8499,500",
+ "0,2499,2000"
};
Connection connection = null;
try {
@@ -205,6 +275,21 @@ public class IoTDBAggregationTestIT {
}
Assert.assertEquals(1, cnt);
statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select max_time(s0),min_time(s2) " +
+ "from root.vehicle.d0 where time <= 2500 and time > 1800");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(max_time(d0s0))
+ + "," + resultSet.getString(min_time(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -218,7 +303,8 @@ public class IoTDBAggregationTestIT {
@Test
public void maxminValueTest() throws SQLException {
String[] retArray = new String[]{
- "0,8499,500.0"
+ "0,8499,500.0",
+ "0,2499,500.0"
};
Connection connection = null;
try {
@@ -238,6 +324,21 @@ public class IoTDBAggregationTestIT {
}
Assert.assertEquals(1, cnt);
statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+ "from root.vehicle.d0 where time < 2500");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(max_value(d0s0))
+ + "," + resultSet.getString(min_value(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -251,7 +352,8 @@ public class IoTDBAggregationTestIT {
@Test
public void meanSumTest() throws SQLException {
String[] retArray = new String[]{
- "0,1.4508E7,7250.374812593703"
+ "0,1.4508E7,7250.374812593703",
+ "0,626750.0,1250.998003992016"
};
Connection connection = null;
try {
@@ -271,6 +373,21 @@ public class IoTDBAggregationTestIT {
}
Assert.assertEquals(1, cnt);
statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select sum(s0),mean(s2)" +
+ "from root.vehicle.d0 where time >= 1000 and time <= 2000");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(sum(d0s0))
+ + "," + resultSet.getString(mean(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java
index 4b16fb4..01d8227 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.query.reader;
-import static org.junit.Assert.*;
-
import java.io.IOException;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.junit.Assert;