This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new 43fe083 group by
new 5b8b3b5 merge
43fe083 is described below
commit 43fe083648739c52c8aedd55d9e9eb35292a845b
Author: suyue <[email protected]>
AuthorDate: Thu Mar 21 13:51:22 2019 +0800
group by
---
.../db/engine/querycontext/ReadOnlyMemChunk.java | 4 +-
.../java/org/apache/iotdb/db/metadata/MTree.java | 2 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 12 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 75 ++---
.../iotdb/db/qp/physical/sys/MetadataPlan.java | 2 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 1 +
.../db/query/aggregation/AggreResultData.java | 192 +++++++++++
.../db/query/aggregation/AggregateFunction.java | 42 ++-
.../db/query/aggregation/impl/CountAggrFunc.java | 78 +++--
.../db/query/aggregation/impl/FirstAggrFunc.java | 78 +++--
.../db/query/aggregation/impl/LastAggrFunc.java | 43 ++-
.../db/query/aggregation/impl/MaxTimeAggrFunc.java | 90 ++----
.../query/aggregation/impl/MaxValueAggrFunc.java | 80 +++--
.../db/query/aggregation/impl/MeanAggrFunc.java | 57 +++-
.../db/query/aggregation/impl/MinTimeAggrFunc.java | 83 +++--
.../query/aggregation/impl/MinValueAggrFunc.java | 87 +++--
.../db/query/aggregation/impl/SumAggrFunc.java | 8 +-
...Reader.java => AggreResultDataPointReader.java} | 18 +-
.../db/query/executor/AggregateEngineExecutor.java | 108 +++----
.../executor/EngineExecutorWithTimeGenerator.java | 44 +--
.../iotdb/db/query/executor/EngineQueryRouter.java | 124 +++++++-
.../iotdb/db/query/executor/GroupByEngine.java | 172 ++++++++++
.../executor/GroupByWithOnlyTimeFilterDataSet.java | 277 ++++++++++++++++
.../executor/GroupByWithValueFilterDataSet.java | 142 +++++++++
.../db/query/factory/SeriesReaderFactory.java | 44 +++
.../apache/iotdb/db/utils/TimeValuePairUtils.java | 27 ++
.../org/apache/iotdb/db/utils/TsPrimitiveType.java | 2 +-
.../iotdb/db/integration/IOTDBGroupByTestIT.java | 351 +++++++++++++++++++++
.../db/integration/IoTDBAggregationTestIT.java | 2 +-
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 2 +-
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 7 +-
.../iotdb/db/query/executor/GroupByEngineTest.java | 124 ++++++++
.../tsfile/read/query/dataset/QueryDataSet.java | 7 +
33 files changed, 1958 insertions(+), 427 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index d42eef7..2bc08cc 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -48,8 +48,8 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter {
/**
* init by TSDataType and TimeValuePairSorter.
*/
-// public ReadOnlyMemChunk(TSDataType dataType, TimeValuePairSorter
memSeries) {
-// this(dataType, memSeries, Collections.emptyMap());
+// public ReadOnlyMemChunk(TSDataType resultDataType, TimeValuePairSorter
memSeries) {
+// this(resultDataType, memSeries, Collections.emptyMap());
// }
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 76c284e..1592636 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -869,7 +869,7 @@ public class MTree implements Serializable {
if (node.isLeaf()) {
if (nodes.length <= idx) {
String nodePath = parent + node;
- List<String> tsRow = new ArrayList<>(4);// get [name,storage
group,dataType,encoding]
+ List<String> tsRow = new ArrayList<>(4);// get [name,storage
group,resultDataType,encoding]
tsRow.add(nodePath);
MeasurementSchema measurementSchema = node.getSchema();
tsRow.add(node.getDataFileName());
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 00ce7ae..6eb69bb 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -190,10 +190,10 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
// }
@Override
- public QueryDataSet groupBy(List<Pair<Path, String>> aggres, IExpression
expression, long unit,
- long origin,
- List<Pair<Long, Long>> intervals, int fetchSize) throws
ProcessorException {
- throw new ProcessorException("not support");
+ public QueryDataSet groupBy(List<Path> paths, List<String> aggres,
IExpression expression, long unit,
+ long origin, List<Pair<Long, Long>> intervals)
+ throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException, PathErrorException, IOException {
+ return new EngineQueryRouter().groupBy(paths, aggres, expression, unit,
origin, intervals);
}
@Override
@@ -516,7 +516,7 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
if (!columnSchema.getType().equals(dataType)
|| !columnSchema.getEncodingType().equals(encoding)) {
throw new ProcessorException(String.format(
- "The dataType or encoding of the last node %s is
conflicting in the storage group %s",
+ "The resultDataType or encoding of the last node %s is
conflicting in the storage group %s",
lastNode, fileNodePath));
}
mManager.addPathToMTree(path.getFullPath(), dataType, encoding,
compressor, props);
@@ -531,7 +531,7 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
if (isNewMeasurement) {
// add time series to schema
fileNodeManager.addTimeSeries(path, dataType, encoding,
compressor, props);
- //TODO fileNodeManager.addTimeSeries(path, dataType, encoding,
compressor, encodingArgs);
+ //TODO fileNodeManager.addTimeSeries(path, resultDataType,
encoding, compressor, encodingArgs);
}
// fileNodeManager.closeOneFileNode(namespacePath);
} catch (FileNodeManagerException e) {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 86ae7f1..7d08808 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -47,16 +48,24 @@ public abstract class QueryProcessExecutor {
public QueryProcessExecutor() {
}
- public QueryDataSet processQuery(PhysicalPlan plan) throws IOException,
FileNodeManagerException, PathErrorException, QueryFilterOptimizationException,
ProcessorException {
+ public QueryDataSet processQuery(PhysicalPlan plan)
+ throws IOException, FileNodeManagerException, PathErrorException,
+ QueryFilterOptimizationException, ProcessorException {
QueryPlan queryPlan = (QueryPlan) plan;
QueryExpression queryExpression =
QueryExpression.create().setSelectSeries(queryPlan.getPaths())
.setExpression(queryPlan.getExpression());
-
- if(plan instanceof AggregationPlan) {
- return aggregate(plan.getPaths(), plan.getAggregations(),
((AggregationPlan) plan).getExpression());
+ if (plan instanceof GroupByPlan) {
+ GroupByPlan groupByPlan = (GroupByPlan) plan;
+ return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
+ groupByPlan.getExpression(), groupByPlan.getUnit(),
groupByPlan.getOrigin(),
+ groupByPlan.getIntervals());
}
+ if (plan instanceof AggregationPlan) {
+ return aggregate(plan.getPaths(), plan.getAggregations(),
+ ((AggregationPlan) plan).getExpression());
+ }
return queryRouter.query(queryExpression);
}
@@ -79,25 +88,22 @@ public abstract class QueryProcessExecutor {
this.fetchSize.set(fetchSize);
}
- public abstract QueryDataSet aggregate(List<Path> paths, List<String>
aggres, IExpression expression)
- throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException, QueryFilterOptimizationException;
+ public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres,
+ IExpression expression) throws ProcessorException, IOException,
PathErrorException,
+ FileNodeManagerException, QueryFilterOptimizationException;
- public abstract QueryDataSet groupBy(List<Pair<Path, String>> aggres,
IExpression expression,
- long unit,
- long origin, List<Pair<Long, Long>> intervals, int fetchSize)
- throws ProcessorException, IOException, PathErrorException;
+ public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres,
+ IExpression expression, long unit, long origin, List<Pair<Long, Long>>
intervals)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException,
+ QueryFilterOptimizationException;
/**
* executeWithGlobalTimeFilter update command and return whether the
operator is successful.
*
- * @param path
- * : update series seriesPath
- * @param startTime
- * start time in update command
- * @param endTime
- * end time in update command
- * @param value
- * - in type of string
+ * @param path : update series seriesPath
+ * @param startTime start time in update command
+ * @param endTime end time in update command
+ * @param value - in type of string
* @return - whether the operator is successful.
*/
public abstract boolean update(Path path, long startTime, long endTime,
String value)
@@ -106,10 +112,8 @@ public abstract class QueryProcessExecutor {
/**
* executeWithGlobalTimeFilter delete command and return whether the
operator is successful.
*
- * @param paths
- * : delete series paths
- * @param deleteTime
- * end time in delete command
+ * @param paths : delete series paths
+ * @param deleteTime end time in delete command
* @return - whether the operator is successful.
*/
public boolean delete(List<Path> paths, long deleteTime) throws
ProcessorException {
@@ -143,10 +147,8 @@ public abstract class QueryProcessExecutor {
/**
* executeWithGlobalTimeFilter delete command and return whether the
operator is successful.
*
- * @param path
- * : delete series seriesPath
- * @param deleteTime
- * end time in delete command
+ * @param path : delete series seriesPath
+ * @param deleteTime end time in delete command
* @return - whether the operator is successful.
*/
protected abstract boolean delete(Path path, long deleteTime) throws
ProcessorException;
@@ -154,12 +156,9 @@ public abstract class QueryProcessExecutor {
/**
* insert a single value. Only used in test
*
- * @param path
- * seriesPath to be inserted
- * @param insertTime
- * - it's time point but not a range
- * @param value
- * value to be inserted
+ * @param path seriesPath to be inserted
+ * @param insertTime - it's time point but not a range
+ * @param value value to be inserted
* @return - Operate Type.
*/
public abstract int insert(Path path, long insertTime, String value) throws
ProcessorException;
@@ -167,14 +166,10 @@ public abstract class QueryProcessExecutor {
/**
* executeWithGlobalTimeFilter insert command and return whether the
operator is successful.
*
- * @param deviceId
- * deviceId to be inserted
- * @param insertTime
- * - it's time point but not a range
- * @param measurementList
- * measurements to be inserted
- * @param insertValues
- * values to be inserted
+ * @param deviceId deviceId to be inserted
+ * @param insertTime - it's time point but not a range
+ * @param measurementList measurements to be inserted
+ * @param insertValues values to be inserted
* @return - Operate Type.
*/
public abstract int multiInsert(String deviceId, long insertTime,
List<String> measurementList,
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index df0c566..1841c36 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -113,7 +113,7 @@ public class MetadataPlan extends PhysicalPlan {
@Override
public String toString() {
- String ret = String.format("seriesPath: %s\ndataType: %s\nencoding:
%s\nnamespace type: %s\nargs: ", path, dataType, encoding, namespaceType);
+ String ret = String.format("seriesPath: %s\nresultDataType: %s\nencoding:
%s\nnamespace type: %s\nargs: ", path, dataType, encoding, namespaceType);
StringBuilder stringBuilder = new StringBuilder(ret.length()+50);
stringBuilder.append(ret);
for (Map.Entry prop : props.entrySet()) {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 2caac94..290d777 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -242,6 +242,7 @@ public class PhysicalGenerator {
((GroupByPlan) queryPlan).setUnit(queryOperator.getUnit());
((GroupByPlan) queryPlan).setOrigin(queryOperator.getOrigin());
((GroupByPlan) queryPlan).setIntervals(queryOperator.getIntervals());
+ ((GroupByPlan)
queryPlan).setAggregations(queryOperator.getSelectOperator().getAggregations());
} else if (queryOperator.isFill()) {
queryPlan = new FillQueryPlan();
FilterOperator timeFilter = queryOperator.getFilterOperator();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
new file mode 100644
index 0000000..0efae01
--- /dev/null
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class AggreResultData {
+
+ private long timestamp;
+ private TSDataType dataType;
+
+ private boolean booleanRet;
+ private int intRet;
+ private long longRet;
+ private float floatRet;
+ private double doubleRet;
+ private Binary binaryRet;
+
+ private boolean isSetValue;
+ private boolean isSetTime;
+
+ public AggreResultData(TSDataType dataType) {
+ this.dataType = dataType;
+ this.isSetTime = false;
+ this.isSetValue = false;
+ }
+
+ public void reSet() {
+ isSetValue = false;
+ isSetTime = false;
+ }
+
+ public void putTimeAndValue(long timestamp, Object v) {
+ setTimestamp(timestamp);
+ setAnObject((Comparable<?>) v);
+ }
+
+ public Object getValue() {
+ switch (dataType) {
+ case BOOLEAN:
+ return booleanRet;
+ case DOUBLE:
+ return doubleRet;
+ case TEXT:
+ return binaryRet;
+ case FLOAT:
+ return floatRet;
+ case INT32:
+ return intRet;
+ case INT64:
+ return longRet;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+
+ /**
+ * set an object.
+ *
+ * @param v object value
+ */
+ public void setAnObject(Comparable<?> v) {
+ isSetValue = true;
+ switch (dataType) {
+ case BOOLEAN:
+ booleanRet = (Boolean) v;
+ break;
+ case DOUBLE:
+ doubleRet = (Double) v;
+ break;
+ case TEXT:
+ binaryRet = (Binary) v;
+ break;
+ case FLOAT:
+ floatRet = (Float) v;
+ break;
+ case INT32:
+ intRet = (Integer) v;
+ break;
+ case INT64:
+ longRet = (Long) v;
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ isSetTime = true;
+ this.timestamp = timestamp;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public boolean isBooleanRet() {
+ return booleanRet;
+ }
+
+ public void setBooleanRet(boolean booleanRet) {
+ this.isSetValue = true;
+ this.booleanRet = booleanRet;
+ }
+
+ public int getIntRet() {
+ return intRet;
+ }
+
+ public void setIntRet(int intRet) {
+ this.isSetValue = true;
+ this.intRet = intRet;
+ }
+
+ public long getLongRet() {
+ return longRet;
+ }
+
+ public void setLongRet(long longRet) {
+ this.isSetValue = true;
+ this.longRet = longRet;
+ }
+
+ public float getFloatRet() {
+ return floatRet;
+ }
+
+ public void setFloatRet(float floatRet) {
+ this.isSetValue = true;
+ this.floatRet = floatRet;
+ }
+
+ public double getDoubleRet() {
+ return doubleRet;
+ }
+
+ public void setDoubleRet(double doubleRet) {
+ this.isSetValue = true;
+ this.doubleRet = doubleRet;
+ }
+
+ public Binary getBinaryRet() {
+ return binaryRet;
+ }
+
+ public void setBinaryRet(Binary binaryRet) {
+ this.isSetValue = true;
+ this.binaryRet = binaryRet;
+ }
+
+ public boolean isSetValue() {
+ return isSetValue;
+ }
+
+ public boolean isSetTime() {
+ return isSetTime;
+ }
+
+ public AggreResultData deepCopy() {
+ AggreResultData aggreResultData = new AggreResultData(this.dataType);
+ if (isSetValue) {
+ aggreResultData.setAnObject((Comparable<?>) this.getValue());
+ }
+ if (isSetTime) {
+ aggreResultData.setTimestamp(this.getTimestamp());
+ }
+ return aggreResultData;
+ }
+}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index 5981b58..bb225e5 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -31,24 +31,24 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
public abstract class AggregateFunction {
protected String name;
- protected BatchData resultData;
- protected TSDataType dataType;
+ protected AggreResultData resultData;
+ protected TSDataType resultDataType;
/**
* construct.
*
* @param name aggregate function name.
- * @param dataType series data type.
+ * @param dataType result data type.
*/
public AggregateFunction(String name, TSDataType dataType) {
this.name = name;
- this.dataType = dataType;
- resultData = new BatchData(dataType, true, true);
+ this.resultDataType = dataType;
+ resultData = new AggreResultData(dataType);
}
public abstract void init();
- public abstract BatchData getResult();
+ public abstract AggreResultData getResult();
/**
* <p>
@@ -74,6 +74,20 @@ public abstract class AggregateFunction {
public abstract void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader) throws IOException, ProcessorException;
+ /**
+ * <p>
+ * Could not calculate using <method>calculateValueFromPageHeader</method>
directly. Calculate the
+ * aggregation according to all decompressed data in this page.
+ * </p>
+ *
+ * @param dataInThisPage the data in the DataPage
+ * @param unsequenceReader unsequence data reader
+ * @param bound the time upper bounder of data in unsequence data reader
+ * @throws IOException TsFile data read exception
+ * @throws ProcessorException wrong aggregation method parameter
+ */
+ public abstract void calculateValueFromPageData(BatchData dataInThisPage,
+ IPointReader unsequenceReader, long bound) throws IOException,
ProcessorException;
/**
* <p>
@@ -106,21 +120,21 @@ public abstract class AggregateFunction {
* @throws ProcessorException wrong aggregation method parameter
*/
public abstract void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException;
+ EngineReaderByTimeStamp dataReader) throws IOException;
/**
- * Judge if aggregation results have been calculated.
+ * Judge if aggregation results have been calculated. In other words, if the
aggregated result
+ * does not need to compute the remaining data, it returns true.
* @return If the aggregation result has been calculated return true, else
return false.
*/
public abstract boolean isCalculatedAggregationResult();
/**
- * <p>
- * This method is calculate the group by function.
- * </p>
+ * Return data type of aggregation function result data.
+ * @return
*/
- public abstract void calcGroupByAggregation(long partitionStart, long
partitionEnd,
- long intervalStart, long intervalEnd,
- BatchData data) throws ProcessorException;
+ public TSDataType getResultDataType() {
+ return resultDataType;
+ }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 2967495..1964875 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -43,14 +44,13 @@ public class CountAggrFunc extends AggregateFunction {
@Override
public void init() {
- if (resultData.length() == 0) {
- resultData.putTime(0);
- resultData.putLong(0);
- }
+ resultData.reSet();
+ resultData.setTimestamp(0);
+ resultData.setLongRet(0);
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
return resultData;
}
@@ -58,9 +58,9 @@ public class CountAggrFunc extends AggregateFunction {
public void calculateValueFromPageHeader(PageHeader pageHeader) {
LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{},
maxTimeStamp{}",
pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(),
pageHeader.getMaxTimestamp());
- long preValue = resultData.getLong();
+ long preValue = resultData.getLongRet();
preValue += pageHeader.getNumOfValues();
- resultData.setLong(0, preValue);
+ resultData.setLongRet(preValue);
}
@@ -76,29 +76,63 @@ public class CountAggrFunc extends AggregateFunction {
} else {
unsequenceReader.next();
}
- long preValue = resultData.getLong();
+ long preValue = resultData.getLongRet();
preValue += 1;
- resultData.setLong(0, preValue);
+ resultData.setLongRet(preValue);
}
if (dataInThisPage.hasNext()) {
- long preValue = resultData.getLong();
+ long preValue = resultData.getLongRet();
preValue += (dataInThisPage.length() - dataInThisPage.getCurIdx());
- resultData.setLong(0, preValue);
+ resultData.setLongRet(preValue);
+ }
+ }
+
+ @Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException {
+ int cnt = 0;
+ while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+ if (dataInThisPage.currentTime() ==
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ dataInThisPage.next();
+ unsequenceReader.next();
+ } else if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ dataInThisPage.next();
+ } else {
+ if (unsequenceReader.current().getTimestamp() >= bound) {
+ break;
+ }
+ unsequenceReader.next();
+ }
+ cnt++;
}
+
+ while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ dataInThisPage.next();
+ cnt++;
+ }
+ long preValue = resultData.getLongRet();
+ preValue += cnt;
+ resultData.setLongRet(preValue);
}
@Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
- throws IOException, ProcessorException {
+ throws IOException {
int cnt = 0;
while (unsequenceReader.hasNext()) {
unsequenceReader.next();
cnt++;
}
- long preValue = resultData.getLong();
+ long preValue = resultData.getLongRet();
preValue += cnt;
- resultData.setLong(0, preValue);
+ resultData.setLongRet(preValue);
}
@Override
@@ -109,14 +143,14 @@ public class CountAggrFunc extends AggregateFunction {
unsequenceReader.next();
cnt++;
}
- long preValue = resultData.getLong();
+ long preValue = resultData.getLongRet();
preValue += cnt;
- resultData.setLong(0, preValue);
+ resultData.setLongRet(preValue);
}
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ EngineReaderByTimeStamp dataReader) throws IOException {
int cnt = 0;
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
@@ -125,19 +159,13 @@ public class CountAggrFunc extends AggregateFunction {
}
}
- long preValue = resultData.getLong();
+ long preValue = resultData.getLongRet();
preValue += cnt;
- resultData.setLong(0, preValue);
+ resultData.setLongRet(preValue);
}
@Override
public boolean isCalculatedAggregationResult() {
return false;
}
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
- }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index a2e4297..99d7a6a 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,17 +40,17 @@ public class FirstAggrFunc extends AggregateFunction {
@Override
public void init() {
-
+ resultData.reSet();
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
return resultData;
}
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
- if (resultData.length() != 0) {
+ if (resultData.isSetTime()) {
return;
}
@@ -57,45 +58,70 @@ public class FirstAggrFunc extends AggregateFunction {
if (firstVal == null) {
throw new ProcessorException("PageHeader contains no FIRST value");
}
- resultData.putTime(0);
- resultData.putAnObject(firstVal);
+ resultData.putTimeAndValue(0, firstVal);
}
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
throws IOException, ProcessorException {
- if (resultData.length() != 0) {
+ if (resultData.isSetTime()) {
return;
}
if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
if (dataInThisPage.currentTime() >=
unsequenceReader.current().getTimestamp()) {
- resultData.putTime(0);
-
resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+ resultData.putTimeAndValue(0,
unsequenceReader.current().getValue().getValue());
unsequenceReader.next();
return;
} else {
- resultData.putTime(0);
- resultData.putAnObject(dataInThisPage.currentValue());
+ resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+ dataInThisPage.next();
return;
}
}
if (dataInThisPage.hasNext()) {
- resultData.putTime(0);
- resultData.putAnObject(dataInThisPage.currentValue());
+ resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+ return;
+ }
+ }
+
+ @Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
+ if (resultData.isSetTime()) {
+ return;
+ }
+ if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+ if (dataInThisPage.currentTime() >=
unsequenceReader.current().getTimestamp()) {
+ if (unsequenceReader.current().getTimestamp() < bound) {
+ resultData.putTimeAndValue(0,
unsequenceReader.current().getValue().getValue());
+ unsequenceReader.next();
+ return;
+ }
+ } else {
+ if (dataInThisPage.currentTime() < bound) {
+ resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+ dataInThisPage.next();
+ return;
+ }
+ }
+ }
+
+ if (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+ dataInThisPage.next();
return;
}
}
@Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
- throws IOException, ProcessorException {
- if (resultData.length() != 0) {
+ throws IOException {
+ if (resultData.isSetTime()) {
return;
}
if (unsequenceReader.hasNext()) {
- resultData.putTime(0);
- resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+ resultData.putTimeAndValue(0,
unsequenceReader.current().getValue().getValue());
return;
}
}
@@ -103,28 +129,26 @@ public class FirstAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
throws IOException {
- if (resultData.length() != 0) {
+ if (resultData.isSetTime()) {
return;
}
if (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
- resultData.putTime(0);
- resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+ resultData.putTimeAndValue(0,
unsequenceReader.current().getValue().getValue());
return;
}
}
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
- if (resultData.length() != 0) {
+ EngineReaderByTimeStamp dataReader) throws IOException {
+ if (resultData.isSetTime()) {
return;
}
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
if (value != null) {
- resultData.putTime(0);
- resultData.putAnObject(value.getValue());
+ resultData.putTimeAndValue(0, value.getValue());
break;
}
}
@@ -132,12 +156,6 @@ public class FirstAggrFunc extends AggregateFunction {
@Override
public boolean isCalculatedAggregationResult() {
- return resultData.length() != 0;
- }
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
+ return resultData.isSetTime();
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index 4630a58..151dba5 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -40,13 +41,13 @@ public class LastAggrFunc extends AggregateFunction {
@Override
public void init() {
-
+ resultData.reSet();
}
@Override
- public BatchData getResult() {
- if (resultData.length() != 0) {
- resultData.setTime(0, 0);
+ public AggreResultData getResult() {
+ if (resultData.isSetTime()) {
+ resultData.setTimestamp(0);
}
return resultData;
}
@@ -60,14 +61,20 @@ public class LastAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
throws IOException, ProcessorException {
+ calculateValueFromPageData(dataInThisPage, unsequenceReader,
Long.MAX_VALUE);
+ }
+
+ @Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
long time = -1;
Object lastVal = null;
- int maxIndex = dataInThisPage.length() - 1;
- if (maxIndex < 0) {
- return;
+ while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ time = dataInThisPage.currentTime();
+ lastVal = dataInThisPage.currentValue();
+ dataInThisPage.next();
}
- time = dataInThisPage.getTimeByIndex(maxIndex);
- lastVal = dataInThisPage.getValueByIndex(maxIndex);
+
while (unsequenceReader.hasNext()) {
if (unsequenceReader.current().getTimestamp() < time) {
unsequenceReader.next();
@@ -114,7 +121,7 @@ public class LastAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ EngineReaderByTimeStamp dataReader) throws IOException {
long time = -1;
Object lastVal = null;
@@ -135,20 +142,12 @@ public class LastAggrFunc extends AggregateFunction {
return false;
}
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
- }
-
private void updateLastResult(long time, Object value) {
- if (resultData.length() == 0) {
- resultData.putAnObject(value);
- resultData.putTime(time);
+ if (!resultData.isSetTime()) {
+ resultData.putTimeAndValue(time, value);
} else {
- if (time >= resultData.currentTime()) {
- resultData.setAnObject(0, (Comparable<?>) value);
- resultData.setTime(0, time);
+ if (time >= resultData.getTimestamp()) {
+ resultData.putTimeAndValue(time, value);
}
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index e47aaf7..f6c6fa9 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -40,50 +41,48 @@ public class MaxTimeAggrFunc extends AggregateFunction {
@Override
public void init() {
-
+ resultData.reSet();
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
return resultData;
}
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
long maxTimestamp = pageHeader.getMaxTimestamp();
-
- //has not set value
- if (resultData.length() == 0) {
- resultData.putTime(0);
- resultData.putLong(maxTimestamp);
- return;
- }
-
- if (resultData.getLong() < maxTimestamp) {
- resultData.setLong(0, maxTimestamp);
- }
+ updateMaxTimeResult(0, maxTimestamp);
}
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
throws IOException, ProcessorException {
- long time = -1;
+
int maxIndex = dataInThisPage.length() - 1;
if (maxIndex < 0) {
return;
}
- time = dataInThisPage.getTimeByIndex(maxIndex);
- if (resultData.length() == 0) {
- if (time != -1) {
- resultData.putTime(0);
- resultData.putAnObject(time);
+ long time = dataInThisPage.getTimeByIndex(maxIndex);
+ updateMaxTimeResult(0, time);
+ }
+
+ @Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
+ long time = -1;
+ while (dataInThisPage.hasNext()) {
+ if(dataInThisPage.currentTime() < bound){
+ time = dataInThisPage.currentTime();
+ dataInThisPage.next();
}
- } else {
- //has set value
- if (time != -1 && time > resultData.getLong()) {
- resultData.setAnObject(0, time);
+ else{
+ break;
}
}
+ if (time != -1) {
+ updateMaxTimeResult(0, time);
+ }
}
@Override
@@ -93,16 +92,8 @@ public class MaxTimeAggrFunc extends AggregateFunction {
while (unsequenceReader.hasNext()) {
pair = unsequenceReader.next();
}
- if (resultData.length() == 0) {
- if (pair != null) {
- resultData.putTime(0);
- resultData.putAnObject(pair.getTimestamp());
- }
- } else {
- //has set value
- if (pair != null && pair.getTimestamp() > resultData.getLong()) {
- resultData.setAnObject(0, pair.getTimestamp());
- }
+ if (pair != null) {
+ updateMaxTimeResult(0, pair.getTimestamp());
}
}
@@ -113,23 +104,15 @@ public class MaxTimeAggrFunc extends AggregateFunction {
while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
pair = unsequenceReader.next();
}
- if (resultData.length() == 0) {
- if (pair != null) {
- resultData.putTime(0);
- resultData.putAnObject(pair.getTimestamp());
- }
- } else {
- //has set value
- if (pair != null && pair.getTimestamp() > resultData.getLong()) {
- resultData.setAnObject(0, pair.getTimestamp());
- }
+ if (pair != null) {
+ updateMaxTimeResult(0, pair.getTimestamp());
}
}
//TODO Consider how to reverse order in dataReader(EngineReaderByTimeStamp)
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ EngineReaderByTimeStamp dataReader) throws IOException {
long time = -1;
for (int i = 0; i < timestamps.size(); i++) {
TsPrimitiveType value =
dataReader.getValueInTimestamp(timestamps.get(i));
@@ -141,15 +124,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
if (time == -1) {
return;
}
-
- if (resultData.length() == 0) {
- resultData.putTime(0);
- resultData.putLong(time);
- } else {
- if (resultData.getLong() < time) {
- resultData.setLong(0, time);
- }
- }
+ updateMaxTimeResult(0, time);
}
@Override
@@ -157,9 +132,10 @@ public class MaxTimeAggrFunc extends AggregateFunction {
return false;
}
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
+ private void updateMaxTimeResult(long time, long value) {
+ if (!resultData.isSetValue() || value >= resultData.getLongRet()) {
+ resultData.setTimestamp(time);
+ resultData.setLongRet(value);
+ }
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index 98d8d83..ceee209 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,25 +40,18 @@ public class MaxValueAggrFunc extends AggregateFunction {
@Override
public void init() {
-
+ resultData.reSet();
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
return resultData;
}
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
Comparable<Object> maxVal = (Comparable<Object>)
pageHeader.getStatistics().getMax();
- if (resultData.length() == 0) {
- resultData.putTime(0);
- resultData.putAnObject(maxVal);
- } else {
- if (maxVal.compareTo(resultData.currentValue()) > 0) {
- resultData.setAnObject(0, maxVal);
- }
- }
+ updateResult(maxVal);
}
@Override
@@ -96,6 +90,50 @@ public class MaxValueAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
+ Comparable<Object> maxVal = null;
+ while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+ if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue())
< 0) {
+ maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+ }
+ dataInThisPage.next();
+ } else if (dataInThisPage.currentTime() ==
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ if (maxVal == null
+ ||
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+ maxVal = (Comparable<Object>)
unsequenceReader.current().getValue().getValue();
+ }
+ dataInThisPage.next();
+ unsequenceReader.next();
+ } else {
+ if (unsequenceReader.current().getTimestamp() >= bound) {
+ break;
+ }
+ if (maxVal == null
+ ||
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+ maxVal = (Comparable<Object>)
unsequenceReader.current().getValue().getValue();
+ }
+ unsequenceReader.next();
+ }
+ }
+
+ while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) <
0) {
+ maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+ }
+ dataInThisPage.next();
+ }
+ updateResult(maxVal);
+ }
+
+ @Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
throws IOException, ProcessorException {
Comparable<Object> maxVal = null;
@@ -125,7 +163,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ EngineReaderByTimeStamp dataReader) throws IOException {
Comparable<Object> maxVal = null;
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
@@ -145,21 +183,11 @@ public class MaxValueAggrFunc extends AggregateFunction {
}
private void updateResult(Comparable<Object> maxVal) {
- if (resultData.length() == 0) {
- if (maxVal != null) {
- resultData.putTime(0);
- resultData.putAnObject(maxVal);
- }
- } else {
- if (maxVal != null && maxVal.compareTo(resultData.currentValue()) > 0) {
- resultData.setAnObject(0, maxVal);
- }
+ if (maxVal == null) {
+ return;
+ }
+ if (!resultData.isSetValue() || maxVal.compareTo(resultData.getValue()) >
0) {
+ resultData.putTimeAndValue(0, maxVal);
}
- }
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index 6a17285..36f7337 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -44,13 +45,16 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void init() {
+ resultData.reSet();
+ sum = 0.0;
+ cnt = 0;
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
if (cnt > 0) {
- resultData.putTime(0);
- resultData.putDouble(sum / cnt);
+ resultData.setTimestamp(0);
+ resultData.setDoubleRet(sum / cnt);
}
return resultData;
}
@@ -86,7 +90,41 @@ public class MeanAggrFunc extends AggregateFunction {
}
}
- private void updateMean(TSDataType type, Object sumVal) throws
ProcessorException {
+ @Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
+ while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+ Object sumVal = null;
+ if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ sumVal = dataInThisPage.currentValue();
+ dataInThisPage.next();
+ } else if (dataInThisPage.currentTime() ==
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ sumVal = unsequenceReader.current().getValue().getValue();
+ dataInThisPage.next();
+ unsequenceReader.next();
+ } else {
+ if (unsequenceReader.current().getTimestamp() >= bound) {
+ break;
+ }
+ sumVal = unsequenceReader.current().getValue().getValue();
+ unsequenceReader.next();
+ }
+ updateMean(seriesDataType, sumVal);
+ }
+
+ while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ updateMean(seriesDataType, dataInThisPage.currentValue());
+ dataInThisPage.next();
+ }
+ }
+
+ private void updateMean(TSDataType type, Object sumVal) throws IOException {
switch (type) {
case INT32:
sum += (int) sumVal;
@@ -103,7 +141,7 @@ public class MeanAggrFunc extends AggregateFunction {
case TEXT:
case BOOLEAN:
default:
- throw new ProcessorException("Unsupported data type in aggregation
MEAN : " + type);
+ throw new IOException("Unsupported data type in aggregation MEAN : " +
type);
}
cnt++;
}
@@ -128,7 +166,7 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ EngineReaderByTimeStamp dataReader) throws IOException {
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
if (value != null) {
@@ -141,11 +179,4 @@ public class MeanAggrFunc extends AggregateFunction {
public boolean isCalculatedAggregationResult() {
return false;
}
-
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
- }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index 7d8afd7..f6bc642 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,83 +40,113 @@ public class MinTimeAggrFunc extends AggregateFunction {
@Override
public void init() {
-
+ resultData.reSet();
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
return resultData;
}
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
- if (resultData.length() > 0) {
+ if (resultData.isSetValue()) {
return;
}
long time = pageHeader.getMinTimestamp();
- resultData.putTime(0);
- resultData.putLong(time);
+ resultData.putTimeAndValue(0, time);
}
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
throws IOException, ProcessorException {
- if (resultData.length() > 0) {
+ if (resultData.isSetValue()) {
return;
}
if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
- resultData.putTime(0);
- resultData.putLong(dataInThisPage.currentTime());
+ resultData.setTimestamp(0);
+ resultData.setLongRet(dataInThisPage.currentTime());
} else {
- resultData.putTime(0);
- resultData.putLong(unsequenceReader.current().getTimestamp());
+ resultData.setTimestamp(0);
+ resultData.setLongRet(unsequenceReader.current().getTimestamp());
}
return;
}
if (dataInThisPage.hasNext()) {
- resultData.putTime(0);
- resultData.putLong(dataInThisPage.currentTime());
+ resultData.setTimestamp(0);
+ resultData.setLongRet(dataInThisPage.currentTime());
+ }
+ }
+
+ @Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
+ if (resultData.isSetValue()) {
+ return;
+ }
+
+ if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+ if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ return;
+ }
+ resultData.setTimestamp(0);
+ resultData.setLongRet(dataInThisPage.currentTime());
+ } else {
+ if (unsequenceReader.current().getTimestamp() >= bound) {
+ return;
+ }
+ resultData.setTimestamp(0);
+ resultData.setLongRet(unsequenceReader.current().getTimestamp());
+ }
+ return;
+ }
+
+ if (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ resultData.setTimestamp(0);
+ resultData.setLongRet(dataInThisPage.currentTime());
+ dataInThisPage.next();
}
}
@Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
throws IOException, ProcessorException {
- if (resultData.length() > 0) {
+ if (resultData.isSetValue()) {
return;
}
if (unsequenceReader.hasNext()) {
- resultData.putTime(0);
- resultData.putLong(unsequenceReader.current().getTimestamp());
+ resultData.setTimestamp(0);
+ resultData.setLongRet(unsequenceReader.current().getTimestamp());
}
}
@Override
public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
throws IOException, ProcessorException {
- if (resultData.length() > 0) {
+ if (resultData.isSetValue()) {
return;
}
if (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
- resultData.putTime(0);
- resultData.putLong(unsequenceReader.current().getTimestamp());
+ resultData.setTimestamp(0);
+ resultData.setLongRet(unsequenceReader.current().getTimestamp());
}
}
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
- if (resultData.length() > 0) {
+ EngineReaderByTimeStamp dataReader) throws IOException {
+ if (resultData.isSetValue()) {
return;
}
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
if (value != null) {
- resultData.putTime(0);
- resultData.putLong(time);
+ resultData.setTimestamp(0);
+ resultData.setLongRet(time);
return;
}
}
@@ -123,13 +154,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
@Override
public boolean isCalculatedAggregationResult() {
- return resultData.length() > 0;
- }
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
+ return resultData.isSetValue();
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index ed030f1..3184c33 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,30 +40,23 @@ public class MinValueAggrFunc extends AggregateFunction {
@Override
public void init() {
-
+ resultData.reSet();
}
@Override
- public BatchData getResult() {
+ public AggreResultData getResult() {
return resultData;
}
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
Comparable<Object> minVal = (Comparable<Object>)
pageHeader.getStatistics().getMin();
- if (resultData.length() == 0) {
- resultData.putTime(0);
- resultData.putAnObject(minVal);
- } else {
- if (minVal.compareTo(resultData.currentValue()) < 0) {
- resultData.setAnObject(0, minVal);
- }
- }
+ updateResult(minVal);
}
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
- throws IOException, ProcessorException {
+ throws IOException {
Comparable<Object> minVal = null;
while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
@@ -97,8 +91,53 @@ public class MinValueAggrFunc extends AggregateFunction {
}
@Override
+ public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
+ long bound) throws IOException, ProcessorException {
+ Comparable<Object> minVal = null;
+ while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+ if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ if (minVal == null || minVal.compareTo(dataInThisPage.currentValue())
> 0) {
+ minVal = (Comparable<Object>) dataInThisPage.currentValue();
+ }
+ dataInThisPage.next();
+ } else if (dataInThisPage.currentTime() ==
unsequenceReader.current().getTimestamp()) {
+ if (dataInThisPage.currentTime() >= bound) {
+ break;
+ }
+ if (minVal == null
+ ||
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+ minVal = (Comparable<Object>)
unsequenceReader.current().getValue().getValue();
+ }
+ dataInThisPage.next();
+ unsequenceReader.next();
+ } else {
+ if (unsequenceReader.current().getTimestamp() >= bound) {
+ break;
+ }
+ if (minVal == null
+ ||
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+ minVal = (Comparable<Object>)
unsequenceReader.current().getValue().getValue();
+ }
+ unsequenceReader.next();
+ }
+ }
+
+ while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+ if (minVal == null
+ || minVal.compareTo(dataInThisPage.currentValue()) > 0) {
+ minVal = (Comparable<Object>) dataInThisPage.currentValue();
+ }
+ dataInThisPage.next();
+ }
+ updateResult(minVal);
+ }
+
+ @Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
- throws IOException, ProcessorException {
+ throws IOException {
Comparable<Object> minVal = null;
while (unsequenceReader.hasNext()) {
if (minVal == null
@@ -112,7 +151,7 @@ public class MinValueAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
- throws IOException, ProcessorException {
+ throws IOException {
Comparable<Object> minVal = null;
while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
if (minVal == null
@@ -126,7 +165,7 @@ public class MinValueAggrFunc extends AggregateFunction {
@Override
public void calcAggregationUsingTimestamps(List<Long> timestamps,
- EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ EngineReaderByTimeStamp dataReader) throws IOException {
Comparable<Object> minVal = null;
for (long time : timestamps) {
TsPrimitiveType value = dataReader.getValueInTimestamp(time);
@@ -146,22 +185,12 @@ public class MinValueAggrFunc extends AggregateFunction {
}
private void updateResult(Comparable<Object> minVal) {
- if (resultData.length() == 0) {
- if (minVal != null) {
- resultData.putTime(0);
- resultData.putAnObject(minVal);
- }
- } else {
- if (minVal != null && minVal.compareTo(resultData.currentValue()) < 0) {
- resultData.setAnObject(0, minVal);
- }
+ if (minVal == null) {
+ return;
+ }
+ if (!resultData.isSetValue() || minVal.compareTo(resultData.getValue()) <
0) {
+ resultData.putTimeAndValue(0, minVal);
}
- }
-
- @Override
- public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
- long intervalEnd, BatchData data) throws ProcessorException {
-
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
index 2f2eb2d..e24641e 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.query.aggregation.impl;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
public class SumAggrFunc extends MeanAggrFunc {
@@ -29,9 +29,9 @@ public class SumAggrFunc extends MeanAggrFunc {
}
@Override
- public BatchData getResult() {
- resultData.putDouble(sum);
- resultData.putTime(0);
+ public AggreResultData getResult() {
+ resultData.setDoubleRet(sum);
+ resultData.setTimestamp(0);
return resultData;
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
similarity index 78%
rename from
iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
rename to
iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
index 2fbf4a1..8af94d0 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
@@ -16,34 +16,34 @@
package org.apache.iotdb.db.query.dataset;
import java.io.IOException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-public class BatchDataPointReader implements IPointReader {
+public class AggreResultDataPointReader implements IPointReader {
- private BatchData batchData;
+ private AggreResultData aggreResultData;
- public BatchDataPointReader(BatchData batchData) {
- this.batchData = batchData;
+ public AggreResultDataPointReader(AggreResultData aggreResultData) {
+ this.aggreResultData = aggreResultData;
}
@Override
public boolean hasNext() throws IOException {
- return batchData.hasNext();
+ return aggreResultData.isSetValue();
}
@Override
public TimeValuePair next() throws IOException {
- TimeValuePair timeValuePair =
TimeValuePairUtils.getCurrentTimeValuePair(batchData);
- batchData.next();
+ TimeValuePair timeValuePair =
TimeValuePairUtils.getCurrentTimeValuePair(aggreResultData);
+ aggreResultData.reSet();
return timeValuePair;
}
@Override
public TimeValuePair current() throws IOException {
- TimeValuePair timeValuePair =
TimeValuePairUtils.getCurrentTimeValuePair(batchData);
+ TimeValuePair timeValuePair =
TimeValuePairUtils.getCurrentTimeValuePair(aggreResultData);
return timeValuePair;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index d5a36fc..8348433 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -29,20 +29,19 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
-import org.apache.iotdb.db.query.dataset.BatchDataPointReader;
+import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -95,7 +94,7 @@ public class AggregateEngineExecutor {
List<IPointReader> readersOfUnSequenceData = new ArrayList<>();
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- //construct AggregateFunction
+ // construct AggregateFunction
TSDataType tsDataType = MManager.getInstance()
.getSeriesType(selectedSeries.get(i).getFullPath());
AggregateFunction function =
AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
@@ -106,7 +105,7 @@ public class AggregateEngineExecutor {
.getQueryDataSource(jobId, selectedSeries.get(i), context);
// sequence reader for sealed tsfile, unsealed tsfile, memory
- SequenceDataReader sequenceReader = null;
+ SequenceDataReader sequenceReader;
if (function instanceof MaxTimeAggrFunc || function instanceof
LastAggrFunc) {
sequenceReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter,
context, true);
@@ -121,18 +120,25 @@ public class AggregateEngineExecutor {
readersOfSequenceData.add(sequenceReader);
readersOfUnSequenceData.add(unSeqMergeReader);
}
-
- List<BatchData> batchDatas = new ArrayList<BatchData>();
+ List<AggreResultData> aggreResultDataList = new ArrayList<>();
//TODO use multi-thread
for (int i = 0; i < selectedSeries.size(); i++) {
- BatchData batchData =
aggregateWithOutTimeGenerator(aggregateFunctions.get(i),
+ AggreResultData aggreResultData =
aggregateWithOutTimeGenerator(aggregateFunctions.get(i),
readersOfSequenceData.get(i), readersOfUnSequenceData.get(i),
timeFilter);
- batchDatas.add(batchData);
+ aggreResultDataList.add(aggreResultData);
}
- return constructDataSet(batchDatas);
+ return constructDataSet(aggreResultDataList);
}
- private BatchData aggregateWithOutTimeGenerator(AggregateFunction function,
+ /**
+ * calculation aggregate result with only time filter or no filter for one
series.
+ * @param function aggregate function
+ * @param sequenceReader sequence data reader
+ * @param unSequenceReader unsequence data reader
+ * @param filter time filter or null
+ * @return one series aggregate result data
+ */
+ private AggreResultData aggregateWithOutTimeGenerator(AggregateFunction
function,
SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter
filter)
throws IOException, ProcessorException {
if (function instanceof MaxTimeAggrFunc || function instanceof
LastAggrFunc) {
@@ -164,6 +170,9 @@ public class AggregateEngineExecutor {
return function.getResult();
}
+ /**
+ * determine whether pageHeader can be used to compute aggregation results.
+ */
private boolean canUseHeader(AggregateFunction function, PageHeader
pageHeader,
IPointReader unSequenceReader, Filter filter)
throws IOException, ProcessorException {
@@ -198,7 +207,7 @@ public class AggregateEngineExecutor {
* @param unSequenceReader unsequence data reader
* @return BatchData-aggregate result
*/
- private BatchData handleLastMaxTimeWithOutTimeGenerator(AggregateFunction
function,
+ private AggreResultData
handleLastMaxTimeWithOutTimeGenerator(AggregateFunction function,
SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter
timeFilter)
throws IOException, ProcessorException {
long lastBatchTimeStamp = Long.MIN_VALUE;
@@ -252,8 +261,8 @@ public class AggregateEngineExecutor {
QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId,
expression);
EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(jobId,
expression, context);
- List<EngineReaderByTimeStamp> readersOfSelectedSeries =
getReadersOfSelectedPaths(
- selectedSeries, context);
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries = SeriesReaderFactory
+ .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context);
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
@@ -262,12 +271,17 @@ public class AggregateEngineExecutor {
function.init();
aggregateFunctions.add(function);
}
- List<BatchData> batchDatas =
aggregateWithTimeGenerator(aggregateFunctions, timestampGenerator,
+ List<AggreResultData> batchDataList =
aggregateWithTimeGenerator(aggregateFunctions,
+ timestampGenerator,
readersOfSelectedSeries);
- return constructDataSet(batchDatas);
+ return constructDataSet(batchDataList);
}
- private List<BatchData> aggregateWithTimeGenerator(List<AggregateFunction>
aggregateFunctions,
+ /**
+ * calculation aggregate result with value filter.
+ */
+ private List<AggreResultData> aggregateWithTimeGenerator(
+ List<AggregateFunction> aggregateFunctions,
EngineTimeGenerator timestampGenerator,
List<EngineReaderByTimeStamp> readersOfSelectedSeries)
throws IOException, ProcessorException {
@@ -288,59 +302,27 @@ public class AggregateEngineExecutor {
aggregateFunctions.get(i)
.calcAggregationUsingTimestamps(timestamps,
readersOfSelectedSeries.get(i));
}
-
}
- List<BatchData> batchDataList = new ArrayList<>();
+ List<AggreResultData> aggreResultDataArrayList = new ArrayList<>();
for (AggregateFunction function : aggregateFunctions) {
- batchDataList.add(function.getResult());
- }
- return batchDataList;
- }
-
- private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path>
paths,
- QueryContext context)
- throws IOException, FileNodeManagerException {
-
- List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
-
- for (Path path : paths) {
-
- QueryDataSource queryDataSource =
QueryDataSourceManager.getQueryDataSource(jobId, path,
- context);
-
- PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new
PriorityMergeReaderByTimestamp();
-
- // reader for sequence data
- SequenceDataReader tsFilesReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(),
- null, context);
-
- // reader for unSequence data
- PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
-
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-
- if (tsFilesReader == null || !tsFilesReader.hasNext()) {
- mergeReaderByTimestamp
- .addReaderWithPriority(unSeqMergeReader,
PriorityMergeReader.HIGH_PRIORITY);
- } else {
- mergeReaderByTimestamp
- .addReaderWithPriority(new AllDataReader(tsFilesReader,
unSeqMergeReader),
- PriorityMergeReader.HIGH_PRIORITY);
- }
-
- readersOfSelectedSeries.add(mergeReaderByTimestamp);
+ aggreResultDataArrayList.add(function.getResult());
}
-
- return readersOfSelectedSeries;
+ return aggreResultDataArrayList;
}
- private QueryDataSet constructDataSet(List<BatchData> batchDataList) throws
IOException {
+ /**
+ * using aggregate result data list construct QueryDataSet.
+ * @param aggreResultDataList aggregate result data list
+ */
+ private QueryDataSet constructDataSet(List<AggreResultData>
aggreResultDataList)
+ throws IOException {
List<TSDataType> dataTypes = new ArrayList<>();
- List<IPointReader> batchDataPointReaders = new ArrayList<>();
- for (BatchData batchData : batchDataList) {
- dataTypes.add(batchData.getDataType());
- batchDataPointReaders.add(new BatchDataPointReader(batchData));
+ List<IPointReader> resultDataPointReaders = new ArrayList<>();
+ for (AggreResultData resultData : aggreResultDataList) {
+ dataTypes.add(resultData.getDataType());
+ resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
}
- return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes,
batchDataPointReaders);
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes,
resultDataPointReaders);
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index d5b1740..f6bebfd 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -22,20 +22,14 @@ package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
-import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -73,7 +67,7 @@ public class EngineExecutorWithTimeGenerator {
List<EngineReaderByTimeStamp> readersOfSelectedSeries;
try {
timestampGenerator = new EngineTimeGenerator(jobId,
queryExpression.getExpression(), context);
- readersOfSelectedSeries =
getReadersOfSelectedPaths(queryExpression.getSelectedSeries(),
+ readersOfSelectedSeries =
SeriesReaderFactory.getByTimestampReadersOfSelectedPaths(jobId,
queryExpression.getSelectedSeries(),
context);
} catch (IOException ex) {
throw new FileNodeManagerException(ex);
@@ -94,40 +88,4 @@ public class EngineExecutorWithTimeGenerator {
readersOfSelectedSeries);
}
- private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path>
paths,
- QueryContext context)
- throws IOException, FileNodeManagerException {
-
- List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
-
- for (Path path : paths) {
-
- QueryDataSource queryDataSource =
QueryDataSourceManager.getQueryDataSource(jobId, path,
- context);
-
- PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new
PriorityMergeReaderByTimestamp();
-
- // reader for sequence data
- SequenceDataReader tsFilesReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(),
- null, context);
-
- // reader for unSequence data
- PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
-
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-
- if (tsFilesReader == null || !tsFilesReader.hasNext()) {
- mergeReaderByTimestamp
- .addReaderWithPriority(unSeqMergeReader,
PriorityMergeReader.HIGH_PRIORITY);
- } else {
- mergeReaderByTimestamp
- .addReaderWithPriority(new AllDataReader(tsFilesReader,
unSeqMergeReader),
- PriorityMergeReader.HIGH_PRIORITY);
- }
-
- readersOfSelectedSeries.add(mergeReaderByTimestamp);
- }
-
- return readersOfSelectedSeries;
- }
-
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index c36d5c2..d7f6cf0 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -19,9 +19,10 @@
package org.apache.iotdb.db.query.executor;
-import static
org.apache.iotdb.tsfile.read.expression.ExpressionType.GLOBAL_TIME;
-
import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -32,10 +33,15 @@ import
org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
/**
* Query entrance class of IoTDB query process. All query clause will be
transformed to physical
@@ -68,10 +74,9 @@ public class EngineQueryRouter {
.optimize(queryExpression.getExpression(),
queryExpression.getSelectedSeries());
queryExpression.setExpression(optimizedExpression);
- if (optimizedExpression.getType() == GLOBAL_TIME) {
+ if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
EngineExecutorWithoutTimeGenerator engineExecutor =
new EngineExecutorWithoutTimeGenerator(
-
nextJobId, queryExpression);
return engineExecutor.executeWithGlobalTimeFilter(context);
} else {
@@ -96,8 +101,8 @@ public class EngineQueryRouter {
* execute aggregation query.
*/
public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
- IExpression expression)
- throws QueryFilterOptimizationException, FileNodeManagerException,
IOException, PathErrorException, ProcessorException {
+ IExpression expression) throws QueryFilterOptimizationException,
FileNodeManagerException,
+ IOException, PathErrorException, ProcessorException {
long nextJobId = getNextJobId();
QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
@@ -110,7 +115,7 @@ public class EngineQueryRouter {
.optimize(expression, selectedSeries);
AggregateEngineExecutor engineExecutor = new
AggregateEngineExecutor(nextJobId,
selectedSeries, aggres, optimizedExpression);
- if (optimizedExpression.getType() == GLOBAL_TIME) {
+ if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
return engineExecutor.executeWithOutTimeGenerator(context);
} else {
return engineExecutor.executeWithTimeGenerator(context);
@@ -122,6 +127,111 @@ public class EngineQueryRouter {
}
}
+ /**
+ * execute groupBy query.
+ * @param selectedSeries select path list
+ * @param aggres aggregation name list
+ * @param expression filter expression
+ * @param unit time granularity for interval partitioning, unit is ms.
+ * @param origin the datum time point for interval division is divided into
a time interval
+ * for each TimeUnit time from this point forward and backward.
+ * @param intervals time intervals, closed interval.
+ * @return
+ * @throws ProcessorException
+ * @throws QueryFilterOptimizationException
+ * @throws FileNodeManagerException
+ * @throws PathErrorException
+ * @throws IOException
+ */
+ public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
+ IExpression expression, long unit, long origin, List<Pair<Long, Long>>
intervals)
+ throws ProcessorException, QueryFilterOptimizationException,
FileNodeManagerException,
+ PathErrorException, IOException {
+
+ long nextJobId = getNextJobId();
+ QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+ QueryContext context = new QueryContext();
+
+ //check the legitimacy of intervals
+ for (Pair<Long, Long> pair : intervals) {
+ if (!(pair.left > 0 && pair.right > 0)) {
+ throw new ProcessorException(
+ String.format("Time interval<%d, %d> must be greater than 0.",
pair.left, pair.right));
+ }
+ if (pair.right < pair.left) {
+ throw new ProcessorException(String.format(
+ "Interval starting time must be greater than the interval ending
time, "
+ + "found error interval<%d, %d>", pair.left, pair.right));
+ }
+ }
+ //merge intervals
+ List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals);
+
+ //construct groupBy intervals filter
+ BinaryExpression intervalFilter = null;
+ for (Pair<Long, Long> pair : mergedIntervalList) {
+ BinaryExpression pairFilter = BinaryExpression
+ .and(new GlobalTimeExpression(TimeFilter.gtEq(pair.left)),
+ new GlobalTimeExpression(TimeFilter.ltEq(pair.right)));
+ if (intervalFilter != null) {
+ intervalFilter = BinaryExpression.or(intervalFilter, pairFilter);
+ } else {
+ intervalFilter = pairFilter;
+ }
+ }
+
+ //merge interval filter and filtering conditions after where statements
+ if (expression == null) {
+ expression = intervalFilter;
+ } else {
+ expression = BinaryExpression.and(expression, intervalFilter);
+ }
+
+ IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+ .optimize(expression, selectedSeries);
+ if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ GroupByWithOnlyTimeFilterDataSet groupByEngine = new
GroupByWithOnlyTimeFilterDataSet(
+ nextJobId, selectedSeries, unit, origin, mergedIntervalList);
+ groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+ return groupByEngine;
+ } else {
+ GroupByWithValueFilterDataSet groupByEngine = new
GroupByWithValueFilterDataSet(nextJobId,
+ selectedSeries, unit, origin, mergedIntervalList);
+ groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+ return groupByEngine;
+ }
+ }
+
+ /**
+ * sort intervals by start time and merge overlapping intervals.
+ *
+ * @param intervals time interval
+ */
+ private List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>>
intervals) {
+ Collections.sort(intervals, new Comparator<Pair<Long, Long>>() {
+ @Override
+ public int compare(Pair<Long, Long> o1, Pair<Long, Long> o2) {
+ /*sort by interval start time.*/
+ return (int) (o1.left - o2.left);
+ }
+ });
+
+ LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
+ for (Pair<Long, Long> interval : intervals) {
+ // if the list of merged intervals is empty or
+ // if the current interval does not overlap with the previous, simply
append it.
+ if (merged.isEmpty() || merged.getLast().right < interval.left) {
+ merged.add(interval);
+ }
+ // otherwise, there is overlap, so we merge the current and previous
intervals.
+ else {
+ merged.getLast().right = Math.max(merged.getLast().right,
interval.right);
+ }
+ }
+ return merged;
+ }
+
private synchronized long getNextJobId() {
return jobIdGenerator.incrementAndGet();
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
new file mode 100644
index 0000000..5430f9e
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public abstract class GroupByEngine extends QueryDataSet {
+
+ protected long jobId;
+ protected List<Path> selectedSeries;
+ private long unit;
+ private long origin;
+ protected List<Pair<Long, Long>> mergedIntervals;
+
+ protected long startTime;
+ protected long endTime;
+ protected int usedIndex;
+ protected List<AggregateFunction> functions;
+ protected boolean hasCachedTimeInterval;
+
+ /**
+ * groupBy query.
+ */
+ public GroupByEngine(long jobId, List<Path> paths, long unit, long origin,
+ List<Pair<Long, Long>> mergedIntervals) {
+ super(paths);
+ this.jobId = jobId;
+ this.selectedSeries = paths;
+ this.unit = unit;
+ this.origin = origin;
+ this.mergedIntervals = mergedIntervals;
+
+ this.functions = new ArrayList<>();
+
+ //init group by time partition
+ this.usedIndex = 0;
+ this.hasCachedTimeInterval = false;
+ this.endTime = -1;
+ }
+
+ protected void initAggreFuction(QueryContext context, List<String> aggres,
IExpression expression)
+ throws FileNodeManagerException, PathErrorException, ProcessorException,
IOException {
+
+ List<TSDataType> types = new ArrayList<>();
+ //construct AggregateFunctions
+ for (int i = 0; i < paths.size(); i++) {
+ TSDataType tsDataType = MManager.getInstance()
+ .getSeriesType(selectedSeries.get(i).getFullPath());
+ AggregateFunction function =
AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
+ function.init();
+ functions.add(function);
+ types.add(function.getResultDataType());
+ }
+ super.setDataTypes(types);
+ }
+
+ @Override
+ public boolean hasNext() {
+ //has cached
+ if (hasCachedTimeInterval) {
+ return true;
+ }
+
+ //end
+ if (usedIndex >= mergedIntervals.size()) {
+ return false;
+ }
+
+ //skip the intervals in coverage of last time-partition
+ while (usedIndex < mergedIntervals.size() &&
mergedIntervals.get(usedIndex).right < endTime) {
+ usedIndex++;
+ }
+ if (usedIndex >= mergedIntervals.size()) {
+ return false;
+ }
+
+ //initialize the start-end time of next interval
+ if (endTime < mergedIntervals.get(usedIndex).left) {
+ //interval start time
+ startTime = mergedIntervals.get(usedIndex).left;
+ if (origin > startTime) {
+ endTime = origin - (origin - startTime) / unit * unit;
+ } else {
+ endTime = origin + (startTime - origin) / unit * unit + unit;
+ }
+ hasCachedTimeInterval = true;
+ return true;
+ }
+
+ //current interval is not covered yet
+ if (endTime < mergedIntervals.get(usedIndex).right) {
+ startTime = endTime;
+ endTime += unit;
+ hasCachedTimeInterval = true;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * this method is only used in the test class to get the next time partition.
+ */
+ public Pair<Long, Long> nextTimePartition() {
+ hasCachedTimeInterval = false;
+ return new Pair<>(startTime, endTime);
+ }
+
+ protected Field getField(AggreResultData aggreResultData) {
+ if (!aggreResultData.isSetValue()) {
+ return new Field(null);
+ }
+ Field field = new Field(aggreResultData.getDataType());
+ switch (aggreResultData.getDataType()) {
+ case INT32:
+ field.setIntV(aggreResultData.getIntRet());
+ break;
+ case INT64:
+ field.setLongV(aggreResultData.getLongRet());
+ break;
+ case FLOAT:
+ field.setFloatV(aggreResultData.getFloatRet());
+ break;
+ case DOUBLE:
+ field.setDoubleV(aggreResultData.getDoubleRet());
+ break;
+ case BOOLEAN:
+ field.setBoolV(aggreResultData.isBooleanRet());
+ break;
+ case TEXT:
+ field.setBinaryV(aggreResultData.getBinaryRet());
+ break;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported: " +
aggreResultData.getDataType());
+ }
+ return field;
+ }
+
+}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
new file mode 100644
index 0000000..67d72d9
--- /dev/null
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IAggregateReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngine {
+
+ protected List<IPointReader> unSequenceReaderList;
+ protected List<IAggregateReader> sequenceReaderList;
+ private List<BatchData> batchDataList;
+ private List<Boolean> hasCachedSequenceDataList;
+ private Filter timeFilter;
+
+ /**
+ * constructor.
+ */
+ public GroupByWithOnlyTimeFilterDataSet(long jobId, List<Path> paths, long
unit, long origin,
+ List<Pair<Long, Long>> mergedIntervals) {
+ super(jobId, paths, unit, origin, mergedIntervals);
+ this.unSequenceReaderList = new ArrayList<>();
+ this.sequenceReaderList = new ArrayList<>();
+ this.timeFilter = null;
+ this.hasCachedSequenceDataList = new ArrayList<>();
+ this.batchDataList = new ArrayList<>();
+ for (int i = 0; i < paths.size(); i++) {
+ hasCachedSequenceDataList.add(false);
+ batchDataList.add(null);
+ }
+ }
+
+ /**
+ * init reader and aggregate function.
+ */
+ public void initGroupBy(QueryContext context, List<String> aggres,
IExpression expression)
+ throws FileNodeManagerException, PathErrorException, ProcessorException,
IOException {
+ initAggreFuction(context, aggres, expression);
+ //init reader
+ QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
+ if (expression != null) {
+ timeFilter = ((GlobalTimeExpression) expression).getFilter();
+ }
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ QueryDataSource queryDataSource = QueryDataSourceManager
+ .getQueryDataSource(jobId, selectedSeries.get(i), context);
+
+ // sequence reader for sealed tsfile, unsealed tsfile, memory
+ SequenceDataReader sequenceReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(),
+ timeFilter, context, false);
+
+ // unseq reader for all chunk groups in unSeqFile, memory
+ PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(),
timeFilter);
+
+ sequenceReaderList.add(sequenceReader);
+ unSequenceReaderList.add(unSeqMergeReader);
+ }
+
+ }
+
+ @Override
+ public RowRecord next() throws IOException {
+ if (!hasCachedTimeInterval) {
+ throw new IOException(
+ "need to call hasNext() before calling next() in
GroupByWithOnlyTimeFilterDataSet.");
+ }
+ hasCachedTimeInterval = false;
+ RowRecord record = new RowRecord(startTime);
+ for (int i = 0; i < functions.size(); i++) {
+ AggreResultData res = null;
+ try {
+ res = nextSeries(i);
+ } catch (ProcessorException e) {
+ throw new IOException(e);
+ }
+ if (res == null) {
+ record.addField(new Field(null));
+ } else {
+ record.addField(getField(res));
+ }
+ }
+ return record;
+ }
+
+ protected AggreResultData nextSeries(int idx) throws IOException,
ProcessorException {
+ IPointReader unsequenceReader = unSequenceReaderList.get(idx);
+ IAggregateReader sequenceReader = sequenceReaderList.get(idx);
+ AggregateFunction function = functions.get(idx);
+ BatchData batchData = batchDataList.get(idx);
+ boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
+
+ function.init();
+
+ boolean finishCheckSequenceData = false;
+
+ //skip the points with timestamp less than startTime
+ skipExessData(idx, sequenceReader, unsequenceReader);
+
+ //there was unprocessed data in last batch
+ if (hasCachedSequenceData && batchData.hasNext()) {
+ function.calculateValueFromPageData(batchData, unsequenceReader,
endTime);
+ }
+
+ if (hasCachedSequenceData && batchData.hasNext()) {
+ finishCheckSequenceData = true;
+ } else {
+ hasCachedSequenceData = false;
+ }
+
+ if (finishCheckSequenceData) {
+ //check unsequence data
+ function.calculateValueFromUnsequenceReader(unsequenceReader, endTime);
+ return function.getResult().deepCopy();
+ }
+
+ //continue checking sequence data
+ while (sequenceReader.hasNext()) {
+ PageHeader pageHeader = sequenceReader.nextPageHeader();
+
+ //memory data
+ if (pageHeader == null) {
+ batchData = sequenceReader.nextBatch();
+ function.calculateValueFromPageData(batchData, unsequenceReader,
endTime);
+ //no point in sequence data with a timestamp less than endTime
+ if (batchData.hasNext()) {
+ hasCachedSequenceData = true;
+ break;
+ }
+ }
+
+ //page data
+ long minTime = pageHeader.getMinTimestamp();
+ long maxTime = pageHeader.getMaxTimestamp();
+ //no point in sequence data with a timestamp less than endTime
+ if (minTime >= endTime) {
+ hasCachedSequenceData = true;
+ batchData = sequenceReader.nextBatch();
+ break;
+ }
+
+ if (canUseHeader(minTime, maxTime, unsequenceReader, function)) {
+ //cal using page header
+ function.calculateValueFromPageHeader(pageHeader);
+ } else {
+ //cal using page data
+ batchData = sequenceReader.nextBatch();
+ function.calculateValueFromPageData(batchData, unsequenceReader,
endTime);
+ if (batchData.hasNext()) {
+ hasCachedSequenceData = true;
+ break;
+ }
+ }
+
+ }
+
+ function.calculateValueFromUnsequenceReader(unsequenceReader, endTime);
+ hasCachedSequenceDataList.set(idx, hasCachedSequenceData);
+ batchDataList.set(idx, batchData);
+ return function.getResult().deepCopy();
+ }
+
+ //skip the points with timestamp less than startTime
+ private void skipExessData(int idx, IAggregateReader sequenceReader,
IPointReader unsequenceReader)
+ throws IOException {
+ BatchData batchData = batchDataList.get(idx);
+ boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
+
+ //skip the unsequenceReader points with timestamp less than startTime
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < startTime){
+ unsequenceReader.next();
+ }
+
+ //skip the cached batch data points with timestamp less than startTime
+ if(hasCachedSequenceData){
+ while (batchData.hasNext() && batchData.currentTime() < startTime){
+ batchData.next();
+ }
+ }
+ if(hasCachedSequenceData && !batchData.hasNext()){
+ hasCachedSequenceData = false;
+ }
+ else {
+ return;
+ }
+
+ //skip the points in sequenceReader data whose timestamp are less than
startTime
+ while (sequenceReader.hasNext()){
+ PageHeader pageHeader = sequenceReader.nextPageHeader();
+ //memory data
+ if(pageHeader == null){
+ batchData = sequenceReader.nextBatch();
+ hasCachedSequenceData = true;
+ while (batchData.hasNext() && batchData.currentTime() < startTime){
+ batchData.next();
+ }
+ continue;
+ }
+ //timestamps of all points in the page are less than startTime
+ if(pageHeader.getMaxTimestamp() < startTime){
+ sequenceReader.skipPageData();
+ continue;
+ }
+ //timestamps of all points in the page are greater or equal to
startTime, don't need to skip
+ if(pageHeader.getMinTimestamp() >= startTime){
+ break;
+ }
+ //the page has overlap with startTime
+ batchData = sequenceReader.nextBatch();
+ hasCachedSequenceData = true;
+ while (batchData.hasNext() && batchData.currentTime() < startTime){
+ batchData.next();
+ }
+ break;
+ }
+
+ batchDataList.set(idx, batchData);
+ hasCachedSequenceDataList.set(idx, hasCachedSequenceData);
+ }
+
+ private boolean canUseHeader(long minTime, long maxTime, IPointReader
unSequenceReader,
+ AggregateFunction function)
+ throws IOException, ProcessorException {
+ if (timeFilter != null && !timeFilter.containStartEndTime(minTime,
maxTime)) {
+ return false;
+ }
+
+ //cal unsequence data with timestamps between pages.
+ function.calculateValueFromUnsequenceReader(unSequenceReader, minTime);
+
+ if (unSequenceReader.hasNext() &&
unSequenceReader.current().getTimestamp() <= maxTime) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
new file mode 100644
index 0000000..077905d
--- /dev/null
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class GroupByWithValueFilterDataSet extends GroupByEngine {
+
+
+ private List<EngineReaderByTimeStamp> allDataReaderList;
+ private TimeGenerator timestampGenerator;
+ private long timestamp;
+ private boolean hasCachedTimestamp;
+
+ //group by batch calculation size.
+ private int timeStampFetchSize;
+
+ /**
+ * constructor.
+ */
+ public GroupByWithValueFilterDataSet(long jobId, List<Path> paths, long
unit, long origin,
+ List<Pair<Long, Long>> mergedIntervals) {
+ super(jobId, paths, unit, origin, mergedIntervals);
+ this.allDataReaderList = new ArrayList<>();
+ this.timeStampFetchSize = 10 *
IoTDBDescriptor.getInstance().getConfig().getFetchSize();
+ }
+
+ /**
+ * init reader and aggregate function.
+ */
+ public void initGroupBy(QueryContext context, List<String> aggres,
IExpression expression)
+ throws FileNodeManagerException, PathErrorException, ProcessorException,
IOException {
+ initAggreFuction(context, aggres, expression);
+
+ QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId,
expression);
+ QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
+ this.timestampGenerator = new EngineTimeGenerator(jobId, expression,
context);
+ this.allDataReaderList = SeriesReaderFactory
+ .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context);
+ }
+
+ @Override
+ public RowRecord next() throws IOException {
+ if (!hasCachedTimeInterval) {
+ throw new IOException(
+ "need to call hasNext() before calling next() in
GroupByWithOnlyTimeFilterDataSet.");
+ }
+ hasCachedTimeInterval = false;
+ for (AggregateFunction function : functions) {
+ function.init();
+ }
+
+ List<Long> timestampList = new ArrayList<>(timeStampFetchSize);
+ if (hasCachedTimestamp) {
+ if (timestamp < endTime) {
+ hasCachedTimestamp = false;
+ timestampList.add(timestamp);
+ } else {
+ //所有域均为空
+ return constructRowRecord();
+ }
+ }
+
+ while (timestampGenerator.hasNext()) {
+ //construct timestamp list
+ for (int cnt = 1; cnt < timeStampFetchSize; cnt++) {
+ if (!timestampGenerator.hasNext()) {
+ break;
+ }
+ timestamp = timestampGenerator.next();
+ if (timestamp < endTime) {
+ timestampList.add(timestamp);
+ } else {
+ hasCachedTimestamp = true;
+ break;
+ }
+ }
+
+ //cal result using timestamp list
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ functions.get(i).calcAggregationUsingTimestamps(timestampList,
allDataReaderList.get(i));
+ }
+
+ timestampList.clear();
+ //judge if it's end
+ if (timestamp >= endTime) {
+ hasCachedTimestamp = true;
+ break;
+ }
+ }
+
+ if(!timestampList.isEmpty()){
+ //cal result using timestamp list
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ functions.get(i).calcAggregationUsingTimestamps(timestampList,
allDataReaderList.get(i));
+ }
+ }
+ return constructRowRecord();
+ }
+
+ private RowRecord constructRowRecord() {
+ RowRecord record = new RowRecord(startTime);
+ for (int i = 0; i < functions.size(); i++) {
+ record.addField(getField(functions.get(i).getResult()));
+ }
+ return record;
+ }
+}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 8a959da..9ad6d52 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -20,29 +20,38 @@
package org.apache.iotdb.db.query.factory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -175,6 +184,41 @@ public class SeriesReaderFactory {
return new SealedTsFilesReader(seriesInTsFileReader, context);
}
+ public static List<EngineReaderByTimeStamp>
getByTimestampReadersOfSelectedPaths(long jobId , List<Path> paths,
+ QueryContext context) throws IOException, FileNodeManagerException {
+
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+
+ for (Path path : paths) {
+
+ QueryDataSource queryDataSource =
QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
+
+ PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new
PriorityMergeReaderByTimestamp();
+
+ // reader for sequence data
+ SequenceDataReader tsFilesReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(),
+ null, context);
+
+ // reader for unSequence data
+ PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
+
+ if (tsFilesReader == null || !tsFilesReader.hasNext()) {
+ mergeReaderByTimestamp
+ .addReaderWithPriority(unSeqMergeReader,
PriorityMergeReader.HIGH_PRIORITY);
+ } else {
+ mergeReaderByTimestamp
+ .addReaderWithPriority(new AllDataReader(tsFilesReader,
unSeqMergeReader),
+ PriorityMergeReader.HIGH_PRIORITY);
+ }
+
+ readersOfSelectedSeries.add(mergeReaderByTimestamp);
+ }
+
+ return readersOfSelectedSeries;
+ }
+
private static class SeriesReaderFactoryHelper {
private static final SeriesReaderFactory INSTANCE = new
SeriesReaderFactory();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
b/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
index 241222a..33a5e4d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -51,4 +52,30 @@ public class TimeValuePairUtils {
throw new
UnSupportedDataTypeException(String.valueOf(data.getDataType()));
}
}
+
+ /**
+ * get given data's current (time,value) pair.
+ *
+ * @param data -AggreResultData
+ * @return -given data's (time,value) pair
+ */
+ public static TimeValuePair getCurrentTimeValuePair(AggreResultData data) {
+ switch (data.getDataType()) {
+ case INT32:
+
+ return new TimeValuePair(data.getTimestamp(), new
TsPrimitiveType.TsInt(data.getIntRet()));
+ case INT64:
+ return new TimeValuePair(data.getTimestamp(), new
TsPrimitiveType.TsLong(data.getLongRet()));
+ case FLOAT:
+ return new TimeValuePair(data.getTimestamp(), new
TsPrimitiveType.TsFloat(data.getFloatRet()));
+ case DOUBLE:
+ return new TimeValuePair(data.getTimestamp(), new
TsPrimitiveType.TsDouble(data.getDoubleRet()));
+ case TEXT:
+ return new TimeValuePair(data.getTimestamp(), new
TsPrimitiveType.TsBinary(data.getBinaryRet()));
+ case BOOLEAN:
+ return new TimeValuePair(data.getTimestamp(), new
TsPrimitiveType.TsBoolean(data.isBooleanRet()));
+ default:
+ throw new
UnSupportedDataTypeException(String.valueOf(data.getDataType()));
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
b/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
index ea43c28..d77704e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
public abstract class TsPrimitiveType implements Serializable {
/**
- * get tsPrimitiveType by dataType.
+ * get tsPrimitiveType by resultDataType.
*
* @param dataType -given TsDataType
* @param v -
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
new file mode 100644
index 0000000..dc2fd4c
--- /dev/null
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.integration.Constant.count;
+import static org.apache.iotdb.db.integration.Constant.first;
+import static org.apache.iotdb.db.integration.Constant.last;
+import static org.apache.iotdb.db.integration.Constant.max_time;
+import static org.apache.iotdb.db.integration.Constant.max_value;
+import static org.apache.iotdb.db.integration.Constant.mean;
+import static org.apache.iotdb.db.integration.Constant.min_time;
+import static org.apache.iotdb.db.integration.Constant.min_value;
+import static org.apache.iotdb.db.integration.Constant.sum;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IOTDBGroupByTestIT {
+ private static IoTDB daemon;
+
+ private static String[] dataSet1 = new String[]{
+ "SET STORAGE GROUP TO root.ln.wf01.wt01",
+ "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32,
ENCODING=PLAIN",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(5, 5.5, false, 55)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(100, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(150, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(200, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(250, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(300, 500.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(10, 10.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(20, 20.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(30, 30.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(40, 40.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(50, 50.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(500, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(510, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(520, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(530, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(540, 500.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(580, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(590, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(600, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(610, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(620, 500.5, false, 550)",
+ };
+
+ private String insertTemplate1 = "INSERT INTO
root.vehicle.d0(timestamp,s0,s1,s2,s3"
+ + ") VALUES(%d,%d,%d,%f,%s)";
+
+ private static final String TIMESTAMP_STR = "Time";
+ private final String d0s0 = "root.vehicle.d0.s0";
+ private final String d0s1 = "root.vehicle.d0.s1";
+ private final String d0s2 = "root.vehicle.d0.s2";
+ private final String d0s3 = "root.vehicle.d0.s3";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ daemon.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void countSumMeanTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,1,4.4,4.4",
+ "5,3,35.8,11.933333333333332",
+ "25,1,30.3,30.3",
+ "50,1,50.5,50.5",
+ "65,0,0.0,null",
+ "85,1,100.1,100.1",
+ "105,0,0.0,null",
+ "125,0,0.0,null",
+ "145,1,200.2,200.2",
+ "310,0,0.0,null"
+ };
+ String[] retArray2 = new String[]{
+ "2,2,7.7,3.85",
+ "5,3,35.8,11.933333333333332",
+ "25,1,30.3,30.3",
+ "50,1,50.5,50.5",
+ "65,0,0.0,null",
+ "85,1,100.1,100.1",
+ "105,0,0.0,null",
+ "125,0,0.0,null",
+ "145,1,200.2,200.2",
+ "310,0,0.0,null"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select count(temperature),
sum(temperature), mean(temperature) from root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(count("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature"))+
"," + resultSet.getString(mean("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select count(temperature),
sum(temperature), mean(temperature) from root.ln.wf01.wt01 where temperature >
3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(count("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature"))+
"," + resultSet.getString(mean("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValeTimeTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,4.4,4.4,4,4",
+ "5,20.2,5.5,20,5",
+ "25,30.3,30.3,30,30",
+ "50,50.5,50.5,50,50",
+ "65,null,null,null,null",
+ "85,100.1,100.1,100,100",
+ "105,null,null,null,null",
+ "125,null,null,null,null",
+ "145,200.2,200.2,150,150",
+ "310,null,null,null,null"
+ };
+ String[] retArray2 = new String[]{
+ "2,4.4,3.3,4,3",
+ "5,20.2,5.5,20,5",
+ "25,30.3,30.3,30,30",
+ "50,50.5,50.5,50,50",
+ "65,null,null,null,null",
+ "85,100.1,100.1,100,100",
+ "105,null,null,null,null",
+ "125,null,null,null,null",
+ "145,200.2,200.2,150,150",
+ "310,null,null,null,null"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select max_value(temperature),
min_value(temperature), max_time(temperature), min_time(temperature) from
root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(max_value("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))+ "," +
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select max_value(temperature),
min_value(temperature), max_time(temperature), min_time(temperature) from
root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(max_value("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))+ "," +
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ //System.out.println(ans);
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void firstLastTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,4.4,4.4",
+ "5,20.2,5.5",
+ "25,30.3,30.3",
+ "50,50.5,50.5",
+ "65,null,null",
+ "85,100.1,100.1",
+ "105,null,null",
+ "125,null,null",
+ "145,200.2,200.2",
+ "310,null,null"
+ };
+ String[] retArray2 = new String[]{
+ "2,4.4,3.3",
+ "5,20.2,5.5",
+ "25,30.3,30.3",
+ "50,50.5,50.5",
+ "65,null,null",
+ "85,100.1,100.1",
+ "105,null,null",
+ "125,null,null",
+ "145,200.2,200.2",
+ "310,null,null"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select last(temperature),
first(temperature) from root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select first(temperature),
last(temperature) from root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ public void prepareData() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+
+ for (String sql : dataSet1) {
+ statement.execute(sql);
+ }
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
index bd1e938..e1f5f13 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
@@ -52,7 +52,7 @@ public class IoTDBAggregationTestIT {
"CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
"CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
"CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
- "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN"
};
private static String[] dataSet2 = new String[]{
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index d8ebfa9..3b46f55 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -81,7 +81,7 @@ public class PhysicalPlanTest {
String metadata = "create timeseries root.vehicle.d1.s1 with
datatype=INT32,encoding=RLE";
QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
MetadataPlan plan = (MetadataPlan)
processor.parseSQLToPhysicalPlan(metadata);
- assertEquals("seriesPath: root.vehicle.d1.s1\n" + "dataType: INT32\n" +
"encoding: RLE\n"
+ assertEquals("seriesPath: root.vehicle.d1.s1\n" + "resultDataType:
INT32\n" + "encoding: RLE\n"
+ "namespace type: ADD_PATH\n" + "args: ", plan.toString());
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index f2beb7d..469b3cc 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -112,12 +112,13 @@ public class MemIntQpExecutor extends
QueryProcessExecutor {
}
@Override
- public QueryDataSet groupBy(List<Pair<Path, String>> aggres, IExpression
expression, long unit,
- long origin,
- List<Pair<Long, Long>> intervals, int fetchSize) {
+ public QueryDataSet groupBy(List<Path> paths, List<String> aggres,
IExpression expression,
+ long unit, long origin, List<Pair<Long, Long>> intervals)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException, QueryFilterOptimizationException {
return null;
}
+
@Override
public boolean judgePathExists(Path path) {
if (SQLConstant.isReservedPath(path)) {
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
new file mode 100644
index 0000000..ee2488b
--- /dev/null
+++
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GroupByEngineTest {
+
+ @Test
+ public void test1() {
+ long jobId = 1000L;
+ long unit = 20;
+ long startTimePoint = 810;
+ List<Pair<Long, Long>> pairList = new ArrayList<>();
+ pairList.add(new Pair<>(805L,811L));
+ pairList.add(new Pair<>(825L,849L));
+
+ long[] startTimeArray = {805, 810, 830};
+ long[] endTimeArray = {810, 830, 850};
+ GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId,
null, unit, startTimePoint, pairList);
+ int cnt = 0;
+ while (groupByEngine.hasNext()){
+ Pair pair = groupByEngine.nextTimePartition();
+ Assert.assertTrue(cnt < startTimeArray.length);
+ Assert.assertEquals(startTimeArray[cnt], pair.left);
+ Assert.assertEquals(endTimeArray[cnt], pair.right);
+ cnt++;
+ }
+ }
+
+ @Test
+ public void test2() {
+ long jobId = 1000L;
+ long unit = 20;
+ long startTimePoint = 850;
+ List<Pair<Long, Long>> pairList = new ArrayList<>();
+ pairList.add(new Pair<>(805L,835L));
+ pairList.add(new Pair<>(850L,855L));
+ pairList.add(new Pair<>(858L,860L));
+ pairList.add(new Pair<>(1200L,1220L));
+
+ long[] startTimeArray = {805, 810, 830, 850, 1200, 1210};
+ long[] endTimeArray = {810, 830, 850, 870, 1210, 1230};
+ GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId,
null, unit, startTimePoint, pairList);
+ int cnt = 0;
+ while (groupByEngine.hasNext()){
+ Pair pair = groupByEngine.nextTimePartition();
+ Assert.assertTrue(cnt < startTimeArray.length);
+ Assert.assertEquals(startTimeArray[cnt], pair.left);
+ Assert.assertEquals(endTimeArray[cnt], pair.right);
+ cnt++;
+ }
+ }
+
+ @Test
+ public void test3() {
+ long jobId = 1000L;
+ long unit = 20;
+ long startTimePoint = 100;
+ List<Pair<Long, Long>> pairList = new ArrayList<>();
+ pairList.add(new Pair<>(805L,835L));
+ pairList.add(new Pair<>(850L,855L));
+ pairList.add(new Pair<>(858L,860L));
+ pairList.add(new Pair<>(1200L,1220L));
+
+ long[] startTimeArray = {805, 820, 850, 1200, 1210};
+ long[] endTimeArray = {820, 840, 860, 1210, 1230};
+ GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId,
null, unit, startTimePoint, pairList);
+ int cnt = 0;
+ while (groupByEngine.hasNext()){
+ Pair pair = groupByEngine.nextTimePartition();
+ Assert.assertTrue(cnt < startTimeArray.length);
+ Assert.assertEquals(startTimeArray[cnt], pair.left);
+ Assert.assertEquals(endTimeArray[cnt], pair.right);
+ cnt++;
+ }
+ }
+
+ @Test
+ public void test4() {
+ long jobId = 1000L;
+ long unit = 200;
+ long startTimePoint = 100;
+ List<Pair<Long, Long>> pairList = new ArrayList<>();
+ pairList.add(new Pair<>(805L,835L));
+ pairList.add(new Pair<>(850L,855L));
+ pairList.add(new Pair<>(858L,860L));
+ pairList.add(new Pair<>(1200L,1220L));
+
+ long[] startTimeArray = {805, 1200};
+ long[] endTimeArray = {900, 1300};
+ GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId,
null, unit, startTimePoint, pairList);
+ int cnt = 0;
+ while (groupByEngine.hasNext()){
+ Pair pair = groupByEngine.nextTimePartition();
+ Assert.assertTrue(cnt < startTimeArray.length);
+ Assert.assertEquals(startTimeArray[cnt], pair.left);
+ Assert.assertEquals(endTimeArray[cnt], pair.right);
+ cnt++;
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index 4c00641..c46da70 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -34,6 +34,10 @@ public abstract class QueryDataSet {
this.dataTypes = dataTypes;
}
+ public QueryDataSet(List<Path> paths) {
+ this.paths = paths;
+ }
+
/**
* This method is used for batch query.
*/
@@ -52,4 +56,7 @@ public abstract class QueryDataSet {
return dataTypes;
}
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
}