This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new aec99a7 modify for pr review suggestion
aec99a7 is described below
commit aec99a77902e2cd2da770a63f3d4e0f7eb83d6b5
Author: suyue <[email protected]>
AuthorDate: Fri Mar 22 21:12:33 2019 +0800
modify for pr review suggestion
---
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 14 +++++------
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 26 ++++++++++++---------
.../db/query/aggregation/AggregateFunction.java | 9 ++++----
.../db/query/aggregation/impl/CountAggrFunc.java | 7 +++---
.../db/query/aggregation/impl/FirstAggrFunc.java | 7 +++---
.../db/query/aggregation/impl/LastAggrFunc.java | 9 ++++----
.../db/query/aggregation/impl/MaxTimeAggrFunc.java | 14 +++++------
.../query/aggregation/impl/MaxValueAggrFunc.java | 7 +++---
.../db/query/aggregation/impl/MeanAggrFunc.java | 11 ++++-----
.../db/query/aggregation/impl/MinTimeAggrFunc.java | 11 ++++-----
.../query/aggregation/impl/MinValueAggrFunc.java | 7 +++---
.../db/query/executor/AggregateEngineExecutor.java | 13 +++++++----
.../EngineExecutorWithoutTimeGenerator.java | 19 ++++-----------
.../iotdb/db/query/executor/EngineQueryRouter.java | 9 ++++----
.../executor/GroupByWithValueFilterDataSet.java | 17 ++++++++------
.../db/query/factory/SeriesReaderFactory.java | 16 +++++++++----
.../java/org/apache/iotdb/db/query/fill/IFill.java | 7 ++----
.../query/timegenerator/EngineNodeConstructor.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 27 ++++++++++++----------
.../org/apache/iotdb/db/qp/plan/QPUpdateTest.java | 9 ++++----
20 files changed, 118 insertions(+), 123 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 9e7a5ae..b8c20d7 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -51,7 +51,6 @@ import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.LoadDataUtils;
@@ -182,13 +181,13 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression)
throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException,
PathErrorException, IOException {
- return new EngineQueryRouter().aggregate(paths, aggres, expression);
+ return queryRouter.aggregate(paths, aggres, expression);
}
@Override
public QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType, IFill> fillTypes)
throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException {
- return new EngineQueryRouter().fill(fillPaths, queryTime, fillTypes);
+ return queryRouter.fill(fillPaths, queryTime, fillTypes);
}
@Override
@@ -196,7 +195,7 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
long unit, long origin, List<Pair<Long, Long>> intervals)
throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException,
PathErrorException, IOException {
- return new EngineQueryRouter().groupBy(paths, aggres, expression, unit,
origin, intervals);
+ return queryRouter.groupBy(paths, aggres, expression, unit, origin,
intervals);
}
@Override
@@ -520,8 +519,8 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
if (!columnSchema.getType().equals(dataType)
|| !columnSchema.getEncodingType().equals(encoding)) {
throw new ProcessorException(String.format(
- "The resultDataType or encoding of the last node %s is
conflicting in the storage group %s",
- lastNode, fileNodePath));
+ "The resultDataType or encoding of the last node %s is
conflicting "
+ + "in the storage group %s", lastNode, fileNodePath));
}
mManager.addPathToMTree(path.getFullPath(), dataType, encoding,
compressor, props);
numSchemaMap.put(lastNode, numSchemaMap.get(lastNode) + 1);
@@ -535,7 +534,8 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
if (isNewMeasurement) {
// add time series to schema
fileNodeManager.addTimeSeries(path, dataType, encoding,
compressor, props);
- //TODO fileNodeManager.addTimeSeries(path, resultDataType,
encoding, compressor, encodingArgs);
+ //TODO fileNodeManager.addTimeSeries(
+ //TODO path, resultDataType, encoding, compressor,
encodingArgs);
}
// fileNodeManager.closeOneFileNode(namespacePath);
} catch (FileNodeManagerException e) {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index ea2def9..f456ac4 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -46,33 +46,37 @@ import org.apache.iotdb.tsfile.utils.Pair;
public abstract class QueryProcessExecutor {
protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
- private EngineQueryRouter queryRouter = new EngineQueryRouter();
+ protected EngineQueryRouter queryRouter = new EngineQueryRouter();
public QueryProcessExecutor() {
}
- public QueryDataSet processQuery(PhysicalPlan plan)
+ /**
+ * process query plan of qp layer, construct queryDataSet.
+ * @param queryPlan QueryPlan
+ * @return QueryDataSet
+ */
+ public QueryDataSet processQuery(QueryPlan queryPlan)
throws IOException, FileNodeManagerException, PathErrorException,
QueryFilterOptimizationException, ProcessorException {
- QueryPlan queryPlan = (QueryPlan) plan;
QueryExpression queryExpression =
QueryExpression.create().setSelectSeries(queryPlan.getPaths())
.setExpression(queryPlan.getExpression());
- if (plan instanceof GroupByPlan) {
- GroupByPlan groupByPlan = (GroupByPlan) plan;
+ if (queryPlan instanceof GroupByPlan) {
+ GroupByPlan groupByPlan = (GroupByPlan) queryPlan;
return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
groupByPlan.getExpression(), groupByPlan.getUnit(),
groupByPlan.getOrigin(),
groupByPlan.getIntervals());
}
- if (plan instanceof AggregationPlan) {
- return aggregate(plan.getPaths(), plan.getAggregations(),
- ((AggregationPlan) plan).getExpression());
+ if (queryPlan instanceof AggregationPlan) {
+ return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
+ ((AggregationPlan) queryPlan).getExpression());
}
- if (plan instanceof FillQueryPlan) {
- FillQueryPlan fillQueryPlan = (FillQueryPlan) plan;
- return fill(plan.getPaths(), fillQueryPlan.getQueryTime(),
fillQueryPlan.getFillType());
+ if (queryPlan instanceof FillQueryPlan) {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+ return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
fillQueryPlan.getFillType());
}
return queryRouter.query(queryExpression);
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index bb225e5..377a856 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -91,7 +90,7 @@ public abstract class AggregateFunction {
/**
* <p>
- * Calculate the aggregation with data in unsequenceReader.
+ * Calculate the aggregation with data in unsequenceReader.
* </p>
*
* @param unsequenceReader unsequence data reader
@@ -101,7 +100,7 @@ public abstract class AggregateFunction {
/**
* <p>
- * Calculate the aggregation with data whose timestamp is less than bound
in unsequenceReader.
+ * Calculate the aggregation with data whose timestamp is less than bound in
unsequenceReader.
* </p>
*
* @param unsequenceReader unsequence data reader
@@ -119,19 +118,19 @@ public abstract class AggregateFunction {
* @throws IOException TsFile data read error
* @throws ProcessorException wrong aggregation method parameter
*/
- public abstract void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public abstract void calcAggregationUsingTimestamps(long[] timestamps, int
length,
EngineReaderByTimeStamp dataReader) throws IOException;
/**
* Judge if aggregation results have been calculated. In other words, if the
aggregated result
* does not need to compute the remaining data, it returns true.
+ *
* @return If the aggregation result has been calculated return true, else
return false.
*/
public abstract boolean isCalculatedAggregationResult();
/**
* Return data type of aggregation function result data.
- * @return
*/
public TSDataType getResultDataType() {
return resultDataType;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 301ce9a..f3fb04e 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -151,11 +150,11 @@ public class CountAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
int cnt = 0;
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
cnt++;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index 99d7a6a..749393a 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -139,14 +138,14 @@ public class FirstAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
if (resultData.isSetTime()) {
return;
}
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
resultData.putTimeAndValue(0, value.getValue());
break;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index 151dba5..bd583a0 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -120,15 +119,15 @@ public class LastAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
long time = -1;
Object lastVal = null;
- for (int i = 0; i < timestamps.size(); i++) {
- TsPrimitiveType value =
dataReader.getValueInTimestamp(timestamps.get(i));
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
- time = timestamps.get(i);
+ time = timestamps[i];
lastVal = value.getValue();
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index f6c6fa9..9bebe5c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -72,11 +71,10 @@ public class MaxTimeAggrFunc extends AggregateFunction {
long bound) throws IOException, ProcessorException {
long time = -1;
while (dataInThisPage.hasNext()) {
- if(dataInThisPage.currentTime() < bound){
+ if (dataInThisPage.currentTime() < bound) {
time = dataInThisPage.currentTime();
dataInThisPage.next();
- }
- else{
+ } else {
break;
}
}
@@ -111,13 +109,13 @@ public class MaxTimeAggrFunc extends AggregateFunction {
//TODO Consider how to reverse order in dataReader(EngineReaderByTimeStamp)
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
long time = -1;
- for (int i = 0; i < timestamps.size(); i++) {
- TsPrimitiveType value =
dataReader.getValueInTimestamp(timestamps.get(i));
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
- time = timestamps.get(i);
+ time = timestamps[i];
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index ceee209..2d2cb84 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -162,11 +161,11 @@ public class MaxValueAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
Comparable<Object> maxVal = null;
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value == null) {
continue;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index 36f7337..3692cdc 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -148,7 +147,7 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
- throws IOException, ProcessorException {
+ throws IOException {
while (unsequenceReader.hasNext()) {
TimeValuePair pair = unsequenceReader.next();
updateMean(seriesDataType, pair.getValue().getValue());
@@ -157,7 +156,7 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromUnsequenceReader(IPointReader
unsequenceReader, long bound)
- throws IOException, ProcessorException {
+ throws IOException {
while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < bound) {
TimeValuePair pair = unsequenceReader.next();
updateMean(seriesDataType, pair.getValue().getValue());
@@ -165,10 +164,10 @@ public class MeanAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
updateMean(seriesDataType, value.getValue());
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index f6bc642..3c2f6c3 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -83,7 +82,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader,
- long bound) throws IOException, ProcessorException {
+ long bound) throws IOException {
if (resultData.isSetValue()) {
return;
}
@@ -137,16 +136,16 @@ public class MinTimeAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
if (resultData.isSetValue()) {
return;
}
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value != null) {
resultData.setTimestamp(0);
- resultData.setLongRet(time);
+ resultData.setLongRet(timestamps[i]);
return;
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index 3184c33..1c0ca1e 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
-import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
@@ -164,11 +163,11 @@ public class MinValueAggrFunc extends AggregateFunction {
}
@Override
- public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ public void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException {
Comparable<Object> minVal = null;
- for (long time : timestamps) {
- TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ for (int i = 0; i < length; i++) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(timestamps[i]);
if (value == null) {
continue;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 8348433..26b1137 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -132,6 +132,7 @@ public class AggregateEngineExecutor {
/**
* calculation aggregate result with only time filter or no filter for one
series.
+ *
* @param function aggregate function
* @param sequenceReader sequence data reader
* @param unSequenceReader unsequence data reader
@@ -284,23 +285,24 @@ public class AggregateEngineExecutor {
List<AggregateFunction> aggregateFunctions,
EngineTimeGenerator timestampGenerator,
List<EngineReaderByTimeStamp> readersOfSelectedSeries)
- throws IOException, ProcessorException {
+ throws IOException {
while (timestampGenerator.hasNext()) {
//generate timestamps for aggregate
- List<Long> timestamps = new ArrayList<>(aggregateFetchSize);
+ long[] timeArray = new long[aggregateFetchSize];
+ int timeArrayLength = 0;
for (int cnt = 0; cnt < aggregateFetchSize; cnt++) {
if (!timestampGenerator.hasNext()) {
break;
}
- timestamps.add(timestampGenerator.next());
+ timeArray[timeArrayLength++] = timestampGenerator.next();
}
//cal part of aggregate result
for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
- aggregateFunctions.get(i)
- .calcAggregationUsingTimestamps(timestamps,
readersOfSelectedSeries.get(i));
+ aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray,
timeArrayLength,
+ readersOfSelectedSeries.get(i));
}
}
@@ -313,6 +315,7 @@ public class AggregateEngineExecutor {
/**
* using aggregate result data list construct QueryDataSet.
+ *
* @param aggreResultDataList aggregate result data list
*/
private QueryDataSet constructDataSet(List<AggreResultData>
aggreResultDataList)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 6eb300e..975271f 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -99,14 +99,8 @@ public class EngineExecutorWithoutTimeGenerator {
throw new FileNodeManagerException(e);
}
- if (tsFilesReader == null) {
- //only have unsequence data.
- readersOfSelectedSeries.add(unSeqMergeReader);
- } else {
- //merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader,
unSeqMergeReader));
- }
-
+ //merge sequence data with unsequence data.
+ readersOfSelectedSeries.add(new AllDataReader(tsFilesReader,
unSeqMergeReader));
}
try {
@@ -159,13 +153,8 @@ public class EngineExecutorWithoutTimeGenerator {
throw new FileNodeManagerException(e);
}
- if (tsFilesReader == null) {
- //only have unsequence data.
- readersOfSelectedSeries.add(unSeqMergeReader);
- } else {
- //merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader,
unSeqMergeReader));
- }
+ //merge sequence data with unsequence data.
+ readersOfSelectedSeries.add(new AllDataReader(tsFilesReader,
unSeqMergeReader));
}
try {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 176cac0..c060922 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -125,7 +125,7 @@ public class EngineQueryRouter {
}
} else {
AggregateEngineExecutor engineExecutor = new
AggregateEngineExecutor(nextJobId,
- selectedSeries, aggres, expression);
+ selectedSeries, aggres, null);
return engineExecutor.executeWithOutTimeGenerator(context);
}
}
@@ -203,10 +203,10 @@ public class EngineQueryRouter {
/**
* execute fill query.
+ *
* @param fillPaths select path list
* @param queryTime timestamp
* @param fillType type IFill map
- * @return
*/
public QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType, IFill> fillType)
throws FileNodeManagerException, PathErrorException, IOException,
ProcessorException {
@@ -240,9 +240,8 @@ public class EngineQueryRouter {
// if the current interval does not overlap with the previous, simply
append it.
if (merged.isEmpty() || merged.getLast().right < interval.left) {
merged.add(interval);
- }
- // otherwise, there is overlap, so we merge the current and previous
intervals.
- else {
+ } else {
+ // otherwise, there is overlap, so we merge the current and previous
intervals.
merged.getLast().right = Math.max(merged.getLast().right,
interval.right);
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
index 3fbcdd8..a4002f2 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
@@ -84,11 +84,12 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngine {
function.init();
}
- List<Long> timestampList = new ArrayList<>(timeStampFetchSize);
+ long[] timestampArray = new long[timeStampFetchSize];
+ int timeArrayLength = 0;
if (hasCachedTimestamp) {
if (timestamp < endTime) {
hasCachedTimestamp = false;
- timestampList.add(timestamp);
+ timestampArray[timeArrayLength++] = timestamp;
} else {
//所有域均为空
return constructRowRecord();
@@ -103,7 +104,7 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngine {
}
timestamp = timestampGenerator.next();
if (timestamp < endTime) {
- timestampList.add(timestamp);
+ timestampArray[timeArrayLength++] = timestamp;
} else {
hasCachedTimestamp = true;
break;
@@ -112,10 +113,11 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngine {
//cal result using timestamp list
for (int i = 0; i < selectedSeries.size(); i++) {
- functions.get(i).calcAggregationUsingTimestamps(timestampList,
allDataReaderList.get(i));
+ functions.get(i).calcAggregationUsingTimestamps(
+ timestampArray, timeArrayLength, allDataReaderList.get(i));
}
- timestampList.clear();
+ timeArrayLength = 0;
//judge if it's end
if (timestamp >= endTime) {
hasCachedTimestamp = true;
@@ -123,10 +125,11 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngine {
}
}
- if(!timestampList.isEmpty()){
+ if (timeArrayLength > 0) {
//cal result using timestamp list
for (int i = 0; i < selectedSeries.size(); i++) {
- functions.get(i).calcAggregationUsingTimestamps(timestampList,
allDataReaderList.get(i));
+ functions.get(i).calcAggregationUsingTimestamps(
+ timestampArray, timeArrayLength, allDataReaderList.get(i));
}
}
return constructRowRecord();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 9ad6d52..8e47a65 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -51,7 +51,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -153,7 +152,7 @@ public class SeriesReaderFactory {
// UnSequence merge reader
IPointReader unSeqMergeReader =
createUnSeqMergeReader(overflowSeriesDataSource,
singleSeriesExpression.getFilter());
- if (seriesInTsFileReader == null || !seriesInTsFileReader.hasNext()) {
+ if (!seriesInTsFileReader.hasNext()) {
//only have unsequence data.
return unSeqMergeReader;
} else {
@@ -184,8 +183,15 @@ public class SeriesReaderFactory {
return new SealedTsFilesReader(seriesInTsFileReader, context);
}
- public static List<EngineReaderByTimeStamp>
getByTimestampReadersOfSelectedPaths(long jobId , List<Path> paths,
- QueryContext context) throws IOException, FileNodeManagerException {
+ /**
+ * construct ByTimestampReader, include sequential data and unsequential
data.
+ * @param jobId query jobId
+ * @param paths selected series path
+ * @param context query context
+ * @return the list of EngineReaderByTimeStamp
+ */
+ public static List<EngineReaderByTimeStamp>
getByTimestampReadersOfSelectedPaths(long jobId,
+ List<Path> paths, QueryContext context) throws IOException,
FileNodeManagerException {
List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
@@ -204,7 +210,7 @@ public class SeriesReaderFactory {
PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
- if (tsFilesReader == null || !tsFilesReader.hasNext()) {
+ if (!tsFilesReader.hasNext()) {
mergeReaderByTimestamp
.addReaderWithPriority(unSeqMergeReader,
PriorityMergeReader.HIGH_PRIORITY);
} else {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index be77c94..faca2aa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.query.fill;
import java.io.IOException;
@@ -63,11 +64,7 @@ public abstract class IFill {
PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(),
timeFilter);
- if (sequenceReader == null) {
- allDataReader = unSeqMergeReader;
- } else {
- allDataReader = new AllDataReader(sequenceReader, unSeqMergeReader);
- }
+ allDataReader = new AllDataReader(sequenceReader, unSeqMergeReader);
}
public abstract IPointReader getFillResult() throws IOException;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index 3d5b3e4..c5aa51f 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -102,7 +102,7 @@ public class EngineNodeConstructor {
PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(),
filter);
- if (tsFilesReader == null || !tsFilesReader.hasNext()) {
+ if (!tsFilesReader.hasNext()) {
//only have unsequence data.
return unSeqMergeReader;
} else {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 9aa7383..834dde3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -24,7 +24,6 @@ import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -207,7 +206,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
}
- private TS_Status getErrorStatus(String message){
+ private TS_Status getErrorStatus(String message) {
TS_Status status = new TS_Status(TS_StatusCode.ERROR_STATUS);
status.setErrorMessage(message);
return status;
@@ -231,7 +230,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
resp.setShowTimeseriesList(showTimeseriesList);
} catch (PathErrorException e) {
status = getErrorStatus(
- String.format("Failed to fetch timeseries %s's metadata
because: %s",
+ String.format("Failed to fetch timeseries %s's metadata because:
%s",
req.getColumnPath(), e));
resp.setStatus(status);
return resp;
@@ -240,8 +239,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
.error(String.format("Failed to fetch timeseries %s's metadata",
req.getColumnPath()),
outOfMemoryError);
status = getErrorStatus(
- String.format("Failed to fetch timeseries %s's metadata
because: %s",
- req.getColumnPath(), outOfMemoryError));
+ String.format("Failed to fetch timeseries %s's metadata because:
%s",
+ req.getColumnPath(), outOfMemoryError));
resp.setStatus(status);
return resp;
}
@@ -252,14 +251,15 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
Set<String> storageGroups =
MManager.getInstance().getAllStorageGroup();
resp.setShowStorageGroups(storageGroups);
} catch (PathErrorException e) {
- status = getErrorStatus(String.format("Failed to fetch storage
groups' metadata because: %s", e));
+ status = getErrorStatus(
+ String.format("Failed to fetch storage groups' metadata because:
%s", e));
resp.setStatus(status);
return resp;
} catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
LOGGER.error("Failed to fetch storage groups' metadata",
outOfMemoryError);
status = getErrorStatus(
- String.format("Failed to fetch storage groups' metadata
because: %s",
- outOfMemoryError));
+ String.format("Failed to fetch storage groups' metadata because:
%s",
+ outOfMemoryError));
resp.setStatus(status);
return resp;
}
@@ -271,7 +271,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
metadataInJson = MManager.getInstance().getMetadataInString();
} catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
LOGGER.error("Failed to fetch all metadata in json",
outOfMemoryError);
- status = getErrorStatus(String.format("Failed to fetch all metadata
in json because: %s", outOfMemoryError));
+ status = getErrorStatus(
+ String.format("Failed to fetch all metadata in json because:
%s", outOfMemoryError));
resp.setStatus(status);
return resp;
}
@@ -296,7 +297,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return resp;
} catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
LOGGER.error("Failed to get delta object map", outOfMemoryError);
- status = getErrorStatus(String.format("Failed to get delta object
map because: %s", outOfMemoryError));
+ status = getErrorStatus(
+ String.format("Failed to get delta object map because: %s",
outOfMemoryError));
break;
}
status = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
@@ -318,7 +320,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
try {
resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
} catch (PathErrorException e) {
- status = getErrorStatus(String.format("Failed to fetch %s's all
columns because: %s", req.getColumnPath(), e));
+ status = getErrorStatus(String
+ .format("Failed to fetch %s's all columns because: %s",
req.getColumnPath(), e));
resp.setStatus(status);
return resp;
} catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
@@ -590,7 +593,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
if (!queryRet.get().containsKey(statement)) {
PhysicalPlan physicalPlan = queryStatus.get().get(statement);
processor.getExecutor().setFetchSize(fetchSize);
- queryDataSet = processor.getExecutor().processQuery(physicalPlan);
+ queryDataSet = processor.getExecutor().processQuery((QueryPlan)
physicalPlan);
queryRet.get().put(statement, queryDataSet);
} else {
queryDataSet = queryRet.get().get(statement);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
index 02f8a69..42e279d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -122,7 +123,7 @@ public class QPUpdateTest {
// query to assert
sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1";
PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
- QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan2);
+ QueryDataSet queryDataSet =
processor.getExecutor().processQuery((QueryPlan) plan2);
String[] expect = {"10 33000 null", "20 null 10"};
int i = 0;
while (queryDataSet.hasNext()) {
@@ -144,7 +145,7 @@ public class QPUpdateTest {
PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
//
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
"sensor_1");
//
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
"sensor_2");
- QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan2);
+ QueryDataSet queryDataSet =
processor.getExecutor().processQuery((QueryPlan) plan2);
String[] expect = {"20 null 10"};
int i = 0;
@@ -167,7 +168,7 @@ public class QPUpdateTest {
PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
//
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
"sensor_1");
//
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
"sensor_2");
- QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan2);
+ QueryDataSet queryDataSet =
processor.getExecutor().processQuery((QueryPlan) plan2);
String[] expect = {"20 null 10"};
int i = 0;
@@ -192,7 +193,7 @@ public class QPUpdateTest {
// query to assert
sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1";
PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
- QueryDataSet queryDataSet = processor.getExecutor().processQuery(plan2);
+ QueryDataSet queryDataSet =
processor.getExecutor().processQuery((QueryPlan) plan2);
String[] expect = {"13 50 40", "20 null 10"};
int i = 0;