This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40ae1e9 Cheery pick changes from cluster_new (#901)
40ae1e9 is described below
commit 40ae1e9fa1f50056bf51c61c2ffef38e8ad80253
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Mar 13 20:01:36 2020 +0800
Cheery pick changes from cluster_new (#901)
* cheery pick changes from cluster_new:
1. getAllClosedStorageGroupTsFile is now grouped by partition
2. fix empty AggregationResult is not correctly serialized
3. fix two empty AvgAggrResult merge to a wrong result
4. fix reset in First/LastValue
5. change member protection levels
6. extract GroupByExecutor and LocalGroupByExecutor
7. extract getters of readers and datasets
8. extract fill initialization
9. fix last does not call right get node method
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 24 ++-
.../org/apache/iotdb/db/metadata/MManager.java | 2 +-
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 4 +
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 1 -
.../db/query/aggregation/AggregateResult.java | 100 +++++-----
.../db/query/aggregation/impl/AvgAggrResult.java | 9 +
.../aggregation/impl/FirstValueAggrResult.java | 6 +
.../aggregation/impl/LastValueAggrResult.java | 6 +
.../dataset/groupby/GroupByEngineDataSet.java | 13 +-
.../dataset/groupby/GroupByExecutor.java} | 32 +--
.../groupby/GroupByWithValueFilterDataSet.java | 26 ++-
.../groupby/GroupByWithoutValueFilterDataSet.java | 216 ++++-----------------
.../dataset/groupby/LocalGroupByExecutor.java | 203 +++++++++++++++++++
.../db/query/executor/AggregationExecutor.java | 16 +-
.../iotdb/db/query/executor/FillQueryExecutor.java | 11 +-
.../iotdb/db/query/executor/LastQueryExecutor.java | 2 +-
.../iotdb/db/query/executor/QueryRouter.java | 23 ++-
.../java/org/apache/iotdb/db/query/fill/IFill.java | 8 +
.../iotdb/db/query/reader/series/SeriesReader.java | 4 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 35 ++--
.../org/apache/iotdb/db/utils/FilePathUtils.java | 6 +
.../tsfile/read/query/dataset/QueryDataSet.java | 3 +
22 files changed, 457 insertions(+), 293 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 9680f39..93df289 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -529,15 +529,23 @@ public class StorageEngine implements IService {
/**
*
- * @return TsFiles (seq or unseq) grouped by their storage group.
+ * @return TsFiles (seq or unseq) grouped by their storage group and
partition number.
*/
- public Map<String, List<TsFileResource>> getAllClosedStorageGroupTsFile() {
- Map<String, List<TsFileResource>> ret = new HashMap<>();
- for (Entry<String, StorageGroupProcessor> entry : processorMap
- .entrySet()) {
- ret.computeIfAbsent(entry.getKey(), sg -> new
ArrayList<>()).addAll(entry.getValue().getSequenceFileTreeSet());
- ret.get(entry.getKey()).addAll(entry.getValue().getUnSequenceFileList());
- ret.get(entry.getKey()).removeIf(file -> !file.isClosed());
+ public Map<String, Map<Long, List<TsFileResource>>>
getAllClosedStorageGroupTsFile() {
+ Map<String, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
+ for (Entry<String, StorageGroupProcessor> entry : processorMap.entrySet())
{
+ List<TsFileResource> allResources =
entry.getValue().getSequenceFileTreeSet();
+ allResources.addAll(entry.getValue().getUnSequenceFileList());
+ for (TsFileResource sequenceFile : allResources) {
+ if (!sequenceFile.isClosed()) {
+ continue;
+ }
+ String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
+ long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
+ Map<Long, List<TsFileResource>> storageGroupFiles =
ret.computeIfAbsent(entry.getKey()
+ ,n -> new HashMap<>());
+ storageGroupFiles.computeIfAbsent(partitionNum, n -> new
ArrayList<>()).add(sequenceFile);
+ }
}
return ret;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 00b21cc..afd72f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -713,7 +713,7 @@ public class MManager {
/**
* Get node by path
*/
- MNode getNodeByPath(String path) throws MetadataException {
+ public MNode getNodeByPath(String path) throws MetadataException {
lock.readLock().lock();
try {
return mtree.getNodeByPath(path);
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 8a3f6d8..1f06dbf 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.db.qp.logical.Operator;
public class AggregationPlan extends RawDataQueryPlan {
+ // e.g., for select count(s1), count(s1), count(s2), count(s2), sum (s1)
+ // aggregations are count, count, count, count, sum
+ // deduplicatedAggregations are count, count, sum
+
private List<String> aggregations = new ArrayList<>();
private List<String> deduplicatedAggregations = new ArrayList<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 45c53a7..4e06220 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -299,7 +299,6 @@ public class PhysicalGenerator {
try {
// remove stars in SELECT to get actual paths
List<String> actualPaths =
getMatchedTimeseries(fullPath.getFullPath());
-
// for actual non exist path
if (actualPaths.isEmpty() && originAggregations.isEmpty()) {
String nonExistMeasurement = fullPath.getMeasurement();
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index ee083ee..832e69a 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -44,7 +44,7 @@ public abstract class AggregateResult {
private double doubleValue;
private Binary binaryValue;
- private boolean hasResult;
+ protected boolean hasResult;
/**
* construct.
@@ -110,29 +110,32 @@ public abstract class AggregateResult {
TSDataType dataType = TSDataType.deserialize(buffer.getShort());
AggregateResult aggregateResult = AggregateResultFactory
.getAggrResultByType(aggregationType, dataType);
- switch (dataType) {
- case BOOLEAN:
- aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
- break;
- case INT32:
- aggregateResult.setIntValue(buffer.getInt());
- break;
- case INT64:
- aggregateResult.setLongValue(buffer.getLong());
- break;
- case FLOAT:
- aggregateResult.setFloatValue(buffer.getFloat());
- break;
- case DOUBLE:
- aggregateResult.setDoubleValue(buffer.getDouble());
- break;
- case TEXT:
- aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
- break;
- default:
- throw new IllegalArgumentException("Invalid Aggregation Type: " +
dataType.name());
+ boolean hasResult = ReadWriteIOUtils.readBool(buffer);
+ if (hasResult) {
+ switch (dataType) {
+ case BOOLEAN:
+ aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
+ break;
+ case INT32:
+ aggregateResult.setIntValue(buffer.getInt());
+ break;
+ case INT64:
+ aggregateResult.setLongValue(buffer.getLong());
+ break;
+ case FLOAT:
+ aggregateResult.setFloatValue(buffer.getFloat());
+ break;
+ case DOUBLE:
+ aggregateResult.setDoubleValue(buffer.getDouble());
+ break;
+ case TEXT:
+ aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid Aggregation Type: " +
dataType.name());
+ }
+ aggregateResult.deserializeSpecificFields(buffer);
}
- aggregateResult.deserializeSpecificFields(buffer);
return aggregateResult;
}
@@ -141,29 +144,32 @@ public abstract class AggregateResult {
public void serializeTo(OutputStream outputStream) throws IOException {
aggregationType.serializeTo(outputStream);
ReadWriteIOUtils.write(resultDataType, outputStream);
- switch (resultDataType) {
- case BOOLEAN:
- ReadWriteIOUtils.write(booleanValue, outputStream);
- break;
- case INT32:
- ReadWriteIOUtils.write(intValue, outputStream);
- break;
- case INT64:
- ReadWriteIOUtils.write(longValue, outputStream);
- break;
- case FLOAT:
- ReadWriteIOUtils.write(floatValue, outputStream);
- break;
- case DOUBLE:
- ReadWriteIOUtils.write(doubleValue, outputStream);
- break;
- case TEXT:
- ReadWriteIOUtils.write(binaryValue, outputStream);
- break;
- default:
- throw new IllegalArgumentException("Invalid Aggregation Type: " +
resultDataType.name());
+ ReadWriteIOUtils.write(hasResult(), outputStream);
+ if (hasResult()) {
+ switch (resultDataType) {
+ case BOOLEAN:
+ ReadWriteIOUtils.write(booleanValue, outputStream);
+ break;
+ case INT32:
+ ReadWriteIOUtils.write(intValue, outputStream);
+ break;
+ case INT64:
+ ReadWriteIOUtils.write(longValue, outputStream);
+ break;
+ case FLOAT:
+ ReadWriteIOUtils.write(floatValue, outputStream);
+ break;
+ case DOUBLE:
+ ReadWriteIOUtils.write(doubleValue, outputStream);
+ break;
+ case TEXT:
+ ReadWriteIOUtils.write(binaryValue, outputStream);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid Aggregation Type: " +
resultDataType.name());
+ }
+ serializeSpecificFields(outputStream);
}
- serializeSpecificFields(outputStream);
}
protected abstract void serializeSpecificFields(OutputStream outputStream)
throws IOException;
@@ -294,4 +300,8 @@ public abstract class AggregateResult {
public String toString() {
return String.valueOf(getResult());
}
+
+ public AggregationType getAggregationType() {
+ return aggregationType;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 1e44444..e93d0e6 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -45,6 +45,11 @@ public class AvgAggrResult extends AggregateResult {
}
@Override
+ protected boolean hasResult() {
+ return cnt > 0;
+ }
+
+ @Override
public Double getResult() {
if (cnt > 0) {
setDoubleValue(avg);
@@ -120,6 +125,10 @@ public class AvgAggrResult extends AggregateResult {
@Override
public void merge(AggregateResult another) {
AvgAggrResult anotherAvg = (AvgAggrResult) another;
+ if (anotherAvg.cnt == 0) {
+ // avoid two empty results producing an NaN
+ return;
+ }
avg = avg * ((double) cnt / (cnt + anotherAvg.cnt)) +
anotherAvg.avg * ((double) anotherAvg.cnt / (cnt + anotherAvg.cnt));
cnt += anotherAvg.cnt;
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 52da9c0..2dc3e2b 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -42,6 +42,12 @@ public class FirstValueAggrResult extends AggregateResult {
}
@Override
+ public void reset() {
+ super.reset();
+ timestamp = Long.MAX_VALUE;
+ }
+
+ @Override
public Object getResult() {
return hasResult() ? getValue() : null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 2077af4..13a6a67 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -41,6 +41,12 @@ public class LastValueAggrResult extends AggregateResult {
}
@Override
+ public void reset() {
+ super.reset();
+ timestamp = Long.MIN_VALUE;
+ }
+
+ @Override
public Object getResult() {
return hasResult() ? getValue() : null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 94d290e..4ca7ceb 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -27,18 +27,21 @@ import org.apache.iotdb.tsfile.utils.Pair;
public abstract class GroupByEngineDataSet extends QueryDataSet {
protected long queryId;
- private long interval;
- private long slidingStep;
+ protected long interval;
+ protected long slidingStep;
// total query [startTime, endTime)
- private long startTime;
- private long endTime;
+ protected long startTime;
+ protected long endTime;
// current interval [curStartTime, curEndTime)
protected long curStartTime;
protected long curEndTime;
- private int usedIndex;
+ protected int usedIndex;
protected boolean hasCachedTimeInterval;
+ public GroupByEngineDataSet() {
+ }
+
/**
* groupBy query.
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
copy to
server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
index 4109382..d5f492e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
@@ -16,27 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.utils;
-import java.io.File;
+package org.apache.iotdb.db.query.dataset.groupby;
-public class FilePathUtils {
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
- private FilePathUtils() {
- // forbidding instantiation
- }
+
+/**
+ * Each executor calculates results of all aggregations on this series
+ */
+public interface GroupByExecutor {
/**
- * Format file path to end with File.separator
- * @param filePath origin file path
- * @return Regularized Path
+ * add reusable result cache in executor
*/
- public static String regularizePath(String filePath){
- if (filePath.length() > 0
- && filePath.charAt(filePath.length() - 1) != File.separatorChar) {
- filePath = filePath + File.separatorChar;
- }
- return filePath;
- }
+ void addAggregateResult(AggregateResult aggrResult);
+ /**
+ * calculate result in [curStartTime, curEndTime)
+ */
+ List<AggregateResult> calcResult(long curStartTime, long curEndTime) throws
IOException, QueryProcessException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 44402cc..a951001 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -29,11 +29,14 @@ import
org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -53,7 +56,10 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
/**
* group by batch calculation size.
*/
- private int timeStampFetchSize;
+ protected int timeStampFetchSize;
+
+ public GroupByWithValueFilterDataSet() {
+ }
/**
* constructor.
@@ -74,18 +80,28 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
/**
* init reader and aggregate function.
*/
- private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+ protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
throws StorageEngineException {
- this.timestampGenerator = new
ServerTimeGenerator(groupByPlan.getExpression(), context);
+ this.timestampGenerator = getTimeGenerator(groupByPlan.getExpression(),
context);
this.allDataReaderList = new ArrayList<>();
this.groupByPlan = groupByPlan;
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
- allDataReaderList.add(new SeriesReaderByTimestamp(path,
dataTypes.get(i), context,
- QueryResourceManager.getInstance().getQueryDataSource(path, context,
null), null));
+ allDataReaderList.add(getReaderByTime(path, dataTypes.get(i), context,
null));
}
}
+ protected TimeGenerator getTimeGenerator(IExpression expression,
QueryContext context)
+ throws StorageEngineException {
+ return new ServerTimeGenerator(expression, context);
+ }
+
+ protected IReaderByTimestamp getReaderByTime(Path path,
+ TSDataType dataType, QueryContext context, TsFileFilter fileFilter)
throws StorageEngineException {
+ return new SeriesReaderByTimestamp(path, dataType, context,
+ QueryResourceManager.getInstance().getQueryDataSource(path, context,
null), fileFilter);
+ }
+
@Override
protected RowRecord nextWithoutConstraint() throws IOException {
if (!hasCachedTimeInterval) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 023cbf3..445d0a6 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -25,26 +25,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.query.reader.series.IAggregateReader;
-import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +47,22 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
.getLogger(GroupByWithoutValueFilterDataSet.class);
private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>();
- private TimeRange timeRange;
+
+ /**
+ * path -> result index for each aggregation
+ *
+ * e.g.,
+ *
+ * deduplicated paths : s1, s2, s1
+ * deduplicated aggregations : count, count, sum
+ *
+ * s1 -> 0, 2
+ * s2 -> 1
+ */
+ private Map<Path, List<Integer>> resultIndexes = new HashMap<>();
+
+ public GroupByWithoutValueFilterDataSet() {
+ }
/**
* constructor.
@@ -66,7 +74,7 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
initGroupBy(context, groupByPlan);
}
- private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+ protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
throws StorageEngineException {
IExpression expression = groupByPlan.getExpression();
@@ -75,17 +83,19 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
+ // init resultIndexes, group result indexes by path
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
if (!pathExecutors.containsKey(path)) {
//init GroupByExecutor
pathExecutors.put(path,
- new GroupByExecutor(path, dataTypes.get(i), context, timeFilter));
+ getGroupByExecutor(path, dataTypes.get(i), context, timeFilter,
null));
+ resultIndexes.put(path, new ArrayList<>());
}
+ resultIndexes.get(path).add(i);
AggregateResult aggrResult = AggregateResultFactory
-
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
- dataTypes.get(i));
- pathExecutors.get(path).addAggregateResult(aggrResult, i);
+
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
dataTypes.get(i));
+ pathExecutors.get(path).addAggregateResult(aggrResult);
}
}
@@ -97,17 +107,16 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
}
hasCachedTimeInterval = false;
RowRecord record = new RowRecord(curStartTime);
- timeRange = new TimeRange(curStartTime, curEndTime - 1);
AggregateResult[] fields = new AggregateResult[paths.size()];
try {
- for (Entry<Path, GroupByExecutor> pathGroupByExecutorEntry :
pathExecutors.entrySet()) {
- GroupByExecutor executor = pathGroupByExecutorEntry.getValue();
- executor.resetAggregateResults();
- List<Pair<AggregateResult, Integer>> aggregations =
executor.calcResult();
- for (Pair<AggregateResult, Integer> aggregation : aggregations) {
- fields[aggregation.right] = aggregation.left;
+ for (Entry<Path, GroupByExecutor> pathToExecutorEntry :
pathExecutors.entrySet()) {
+ GroupByExecutor executor = pathToExecutorEntry.getValue();
+ List<AggregateResult> aggregations = executor.calcResult(curStartTime,
curEndTime);
+ for (int i = 0; i < aggregations.size(); i++) {
+ int resultIndex =
resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
+ fields[resultIndex] = aggregations.get(i);
}
}
} catch (QueryProcessException e) {
@@ -125,165 +134,10 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
return record;
}
- private class GroupByExecutor {
-
- private IAggregateReader reader;
- private BatchData preCachedData;
- //<aggFunction - indexForRecord> of path
- private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
-
- GroupByExecutor(Path path, TSDataType dataType, QueryContext context,
Filter timeFilter)
- throws StorageEngineException {
- QueryDataSource queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSource(path, context, timeFilter);
- // update filter by TTL
- timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
- this.reader = new SeriesAggregateReader(path, dataType, context,
queryDataSource, timeFilter,
- null, null);
- this.preCachedData = null;
- }
-
- private List<Pair<AggregateResult, Integer>> calcResult()
- throws IOException, QueryProcessException {
- if (calcFromCacheData()) {
- return results;
- }
-
- //read page data firstly
- if (readAndCalcFromPage()) {
- return results;
- }
-
- //read chunk finally
- while (reader.hasNextChunk()) {
- Statistics chunkStatistics = reader.currentChunkStatistics();
- if (chunkStatistics.getStartTime() >= curEndTime) {
- return results;
- }
- //calc from chunkMetaData
- if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
- new TimeRange(chunkStatistics.getStartTime(),
chunkStatistics.getEndTime()))) {
- calcFromStatistics(chunkStatistics);
- reader.skipCurrentChunk();
- if(isEndCalc()){
- return results;
- }
- continue;
- }
- if (readAndCalcFromPage()) {
- return results;
- }
- }
- return results;
- }
-
- private void addAggregateResult(AggregateResult aggrResult, int index) {
- results.add(new Pair<>(aggrResult, index));
- }
-
- private boolean isEndCalc() {
- for (Pair<AggregateResult, Integer> result : results) {
- if (!result.left.isCalculatedAggregationResult()) {
- return false;
- }
- }
- return true;
- }
-
- private boolean calcFromCacheData() throws IOException {
- calcFromBatch(preCachedData);
- // The result is calculated from the cache
- return (preCachedData != null && preCachedData.getMaxTimestamp() >=
curEndTime)
- || isEndCalc();
- }
-
- private void calcFromBatch(BatchData batchData) throws IOException {
- // is error data
- if (batchData == null
- || !batchData.hasCurrent()
- || batchData.getMaxTimestamp() < curStartTime
- || batchData.currentTime() >= curEndTime) {
- return;
- }
-
- for (Pair<AggregateResult, Integer> result : results) {
- //current agg method has been calculated
- if (result.left.isCalculatedAggregationResult()) {
- continue;
- }
- //lazy reset batch data for calculation
- batchData.resetBatchData();
- //skip points that cannot be calculated
- while (batchData.currentTime() < curStartTime &&
batchData.hasCurrent()) {
- batchData.next();
- }
- if (batchData.hasCurrent()) {
- result.left.updateResultFromPageData(batchData, curEndTime);
- }
- }
- //can calc for next interval
- if (batchData.getMaxTimestamp() >= curEndTime) {
- preCachedData = batchData;
- }
- }
-
- private void calcFromStatistics(Statistics statistics)
- throws QueryProcessException {
- for (Pair<AggregateResult, Integer> result : results) {
- //cacl is compile
- if (result.left.isCalculatedAggregationResult()) {
- continue;
- }
- result.left.updateResultFromStatistics(statistics);
- }
- }
-
- // clear all results
- private void resetAggregateResults() {
- for (Pair<AggregateResult, Integer> result : results) {
- result.left.reset();
- }
- }
-
-
- private boolean readAndCalcFromPage() throws IOException,
QueryProcessException {
- while (reader.hasNextPage()) {
- Statistics pageStatistics = reader.currentPageStatistics();
- //must be non overlapped page
- if (pageStatistics != null) {
- //current page max than time range
- if (pageStatistics.getStartTime() >= curEndTime) {
- return true;
- }
- //can use pageHeader
- if (reader.canUseCurrentPageStatistics() && timeRange.contains(
- new TimeRange(pageStatistics.getStartTime(),
pageStatistics.getEndTime()))) {
- calcFromStatistics(pageStatistics);
- reader.skipCurrentPage();
- if (isEndCalc()) {
- return true;
- }
- continue;
- }
- }
- // calc from page data
- BatchData batchData = reader.nextPage();
- if (batchData == null || !batchData.hasCurrent()) {
- continue;
- }
- // stop calc and cached current batchData
- if (batchData.currentTime() >= curEndTime) {
- preCachedData = batchData;
- return true;
- }
-
- calcFromBatch(batchData);
- if (isEndCalc() || batchData.currentTime() >= curEndTime) {
- return true;
- }
- }
- return false;
- }
+ protected GroupByExecutor getGroupByExecutor(Path path,
+ TSDataType dataType,
+ QueryContext context, Filter timeFilter, TsFileFilter fileFilter)
+ throws StorageEngineException {
+ return new LocalGroupByExecutor(path, dataType, context, timeFilter,
fileFilter);
}
-
}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
new file mode 100644
index 0000000..7e127b0
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.IAggregateReader;
+import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+public class LocalGroupByExecutor implements GroupByExecutor {
+
+ private IAggregateReader reader;
+ private BatchData preCachedData;
+
+ // Aggregate result buffer of this path
+ private List<AggregateResult> results = new ArrayList<>();
+ private TimeRange timeRange;
+
+ public LocalGroupByExecutor(Path path, TSDataType dataType, QueryContext
context, Filter timeFilter,
+ TsFileFilter fileFilter)
+ throws StorageEngineException {
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(path, context, timeFilter);
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+ this.reader = new SeriesAggregateReader(path, dataType, context,
queryDataSource, timeFilter,
+ null, fileFilter);
+ this.preCachedData = null;
+ timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void addAggregateResult(AggregateResult aggrResult) {
+ results.add(aggrResult);
+ }
+
+ private boolean isEndCalc() {
+ for (AggregateResult result : results) {
+ if (!result.isCalculatedAggregationResult()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean calcFromCacheData(long curStartTime, long curEndTime) throws
IOException {
+ calcFromBatch(preCachedData, curStartTime, curEndTime);
+ // The result is calculated from the cache
+ return (preCachedData != null && preCachedData.getMaxTimestamp() >=
curEndTime)
+ || isEndCalc();
+ }
+
+ private void calcFromBatch(BatchData batchData, long curStartTime, long
curEndTime) throws IOException {
+ // is error data
+ if (batchData == null
+ || !batchData.hasCurrent()
+ || batchData.getMaxTimestamp() < curStartTime
+ || batchData.currentTime() >= curEndTime) {
+ return;
+ }
+
+ for (AggregateResult result : results) {
+ //current agg method has been calculated
+ if (result.isCalculatedAggregationResult()) {
+ continue;
+ }
+ //lazy reset batch data for calculation
+ batchData.resetBatchData();
+ //skip points that cannot be calculated
+ while (batchData.currentTime() < curStartTime && batchData.hasCurrent())
{
+ batchData.next();
+ }
+ if (batchData.hasCurrent()) {
+ result.updateResultFromPageData(batchData, curEndTime);
+ }
+ }
+ //can calc for next interval
+ if (batchData.getMaxTimestamp() >= curEndTime) {
+ preCachedData = batchData;
+ }
+ }
+
+ private void calcFromStatistics(Statistics pageStatistics)
+ throws QueryProcessException {
+ for (AggregateResult result : results) {
+ //cacl is compile
+ if (result.isCalculatedAggregationResult()) {
+ continue;
+ }
+ result.updateResultFromStatistics(pageStatistics);
+ }
+ }
+
+ @Override
+ public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+ throws IOException, QueryProcessException {
+
+ // clear result cache
+ for (AggregateResult result : results) {
+ result.reset();
+ }
+
+ timeRange.set(curStartTime, curEndTime - 1);
+ if (calcFromCacheData(curStartTime, curEndTime)) {
+ return results;
+ }
+
+ //read page data firstly
+ if (readAndCalcFromPage(curStartTime, curEndTime)) {
+ return results;
+ }
+
+ //read chunk finally
+ while (reader.hasNextChunk()) {
+ Statistics chunkStatistics = reader.currentChunkStatistics();
+ if (chunkStatistics.getStartTime() >= curEndTime) {
+ return results;
+ }
+ //calc from chunkMetaData
+ if (reader.canUseCurrentChunkStatistics()
+ && timeRange.contains(chunkStatistics.getStartTime(),
chunkStatistics.getEndTime())) {
+ calcFromStatistics(chunkStatistics);
+ reader.skipCurrentChunk();
+ continue;
+ }
+ if (readAndCalcFromPage(curStartTime, curEndTime)) {
+ return results;
+ }
+ }
+ return results;
+ }
+
+ private boolean readAndCalcFromPage(long curStartTime, long curEndTime)
throws IOException,
+ QueryProcessException {
+ while (reader.hasNextPage()) {
+ Statistics pageStatistics = reader.currentPageStatistics();
+ //must be non overlapped page
+ if (pageStatistics != null) {
+ //current page max than time range
+ if (pageStatistics.getStartTime() >= curEndTime) {
+ return true;
+ }
+ //can use pageHeader
+ if (reader.canUseCurrentPageStatistics()
+ && timeRange.contains(pageStatistics.getStartTime(),
pageStatistics.getEndTime())) {
+ calcFromStatistics(pageStatistics);
+ reader.skipCurrentPage();
+ if (isEndCalc()) {
+ return true;
+ }
+ continue;
+ }
+ }
+ // calc from page data
+ BatchData batchData = reader.nextPage();
+ if (batchData == null || !batchData.hasCurrent()) {
+ continue;
+ }
+ // stop calc and cached current batchData
+ if (batchData.currentTime() >= curEndTime) {
+ preCachedData = batchData;
+ return true;
+ }
+
+ calcFromBatch(batchData, curStartTime, curEndTime);
+ if (isEndCalc() || batchData.currentTime() >= curEndTime) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6882e7b..1e63f5c 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -57,7 +57,7 @@ public class AggregationExecutor {
private List<Path> selectedSeries;
protected List<TSDataType> dataTypes;
- private List<String> aggregations;
+ protected List<String> aggregations;
protected IExpression expression;
/**
@@ -65,7 +65,7 @@ public class AggregationExecutor {
**/
private int aggregateFetchSize;
- AggregationExecutor(AggregationPlan aggregationPlan) {
+ protected AggregationExecutor(AggregationPlan aggregationPlan) {
this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
this.aggregations = aggregationPlan.getDeduplicatedAggregations();
@@ -78,7 +78,7 @@ public class AggregationExecutor {
*
* @param context query context
*/
- QueryDataSet executeWithoutValueFilter(QueryContext context)
+ public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException, IOException, QueryProcessException {
Filter timeFilter = null;
@@ -109,7 +109,7 @@ public class AggregationExecutor {
* @param context query context
* @return AggregateResult list
*/
- private List<AggregateResult> aggregateOneSeries(
+ protected List<AggregateResult> aggregateOneSeries(
Map.Entry<Path, List<Integer>> pathToAggrIndexes,
Filter timeFilter, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException {
@@ -128,7 +128,7 @@ public class AggregationExecutor {
return aggregateResultList;
}
- private static void aggregateOneSeries(Path seriesPath, QueryContext
context, Filter timeFilter,
+ public static void aggregateOneSeries(Path seriesPath, QueryContext context,
Filter timeFilter,
TSDataType tsDataType, List<AggregateResult> aggregateResultList,
TsFileFilter fileFilter)
throws StorageEngineException, IOException, QueryProcessException {
@@ -227,7 +227,7 @@ public class AggregationExecutor {
*
* @param context query context.
*/
- QueryDataSet executeWithValueFilter(QueryContext context)
+ public QueryDataSet executeWithValueFilter(QueryContext context)
throws StorageEngineException, IOException {
TimeGenerator timestampGenerator = getTimeGenerator(context);
@@ -249,11 +249,11 @@ public class AggregationExecutor {
return constructDataSet(aggregateResults);
}
- private TimeGenerator getTimeGenerator(QueryContext context) throws
StorageEngineException {
+ protected TimeGenerator getTimeGenerator(QueryContext context) throws
StorageEngineException {
return new ServerTimeGenerator(expression, context);
}
- private IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType,
+ protected IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType,
QueryContext context) throws StorageEngineException {
return new SeriesReaderByTimestamp(path,
dataType, context,
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index a14742d..08cdde1 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -88,9 +88,7 @@ public class FillQueryExecutor {
} else {
fill = typeIFillMap.get(dataType).copy();
}
- fill.setDataType(dataType);
- fill.setQueryTime(queryTime);
- fill.constructReaders(path, context);
+ configureFill(fill, dataType, path, context, queryTime);
TimeValuePair timeValuePair = fill.getFillResult();
if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -104,4 +102,11 @@ public class FillQueryExecutor {
dataSet.setRecord(record);
return dataSet;
}
+
+ protected void configureFill(IFill fill, TSDataType dataType, Path path,
QueryContext context,
+ long queryTime) throws StorageEngineException {
+ fill.setDataType(dataType);
+ fill.setQueryTime(queryTime);
+ fill.constructReaders(path, context);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 0317fdc..b6aea68 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -111,7 +111,7 @@ public class LastQueryExecutor {
// Retrieve last value from MNode
MNode node = null;
try {
- node =
MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString());
+ node = MManager.getInstance().getNodeByPath(seriesPath.toString());
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 9807933..6ed625e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -138,12 +138,22 @@ public class QueryRouter implements IQueryRouter {
groupByPlan.setExpression(optimizedExpression);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
- return new GroupByWithoutValueFilterDataSet(context, groupByPlan);
+ return getGroupByWithoutValueFilterDataSet(context, groupByPlan);
} else {
- return new GroupByWithValueFilterDataSet(context, groupByPlan);
+ return getGroupByWithValueFilterDataSet(context, groupByPlan);
}
}
+ protected GroupByWithoutValueFilterDataSet
getGroupByWithoutValueFilterDataSet(QueryContext context, GroupByPlan plan)
+ throws StorageEngineException {
+ return new GroupByWithoutValueFilterDataSet(context, plan);
+ }
+
+ protected GroupByWithValueFilterDataSet
getGroupByWithValueFilterDataSet(QueryContext context, GroupByPlan plan)
+ throws StorageEngineException {
+ return new GroupByWithValueFilterDataSet(context, plan);
+ }
+
@Override
public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
@@ -152,11 +162,18 @@ public class QueryRouter implements IQueryRouter {
long queryTime = fillQueryPlan.getQueryTime();
Map<TSDataType, IFill> fillType = fillQueryPlan.getFillType();
- FillQueryExecutor fillQueryExecutor = new FillQueryExecutor(fillPaths,
dataTypes, queryTime,
+ FillQueryExecutor fillQueryExecutor = getFillExecutor(fillPaths,
dataTypes, queryTime,
fillType);
return fillQueryExecutor.execute(context);
}
+ protected FillQueryExecutor getFillExecutor(
+ List<Path> fillPaths,
+ List<TSDataType> dataTypes, long queryTime,
+ Map<TSDataType, IFill> fillType) {
+ return new FillQueryExecutor(fillPaths, dataTypes, queryTime, fillType);
+ }
+
@Override
public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext
context)
throws StorageEngineException, QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index f1d9a21..d8eb77b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -57,6 +57,14 @@ public abstract class IFill {
timeFilter, null, null);
}
+ public void setAllDataReader(IBatchReader allDataReader) {
+ this.allDataReader = allDataReader;
+ }
+
+ public Filter getFilter() {
+ return constructFilter();
+ }
+
public abstract TimeValuePair getFillResult() throws IOException,
UnSupportedFillTypeException;
public TSDataType getDataType() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 1d7825d..ef27d37 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import java.io.IOException;
import java.util.*;
-class SeriesReader {
+public class SeriesReader {
private final Path seriesPath;
private final TSDataType dataType;
@@ -94,7 +94,7 @@ class SeriesReader {
private boolean hasCachedNextOverlappedPage;
private BatchData cachedBatchData;
- SeriesReader(Path seriesPath, TSDataType dataType, QueryContext context,
+ public SeriesReader(Path seriesPath, TSDataType dataType, QueryContext
context,
QueryDataSource dataSource, Filter timeFilter, Filter valueFilter,
TsFileFilter fileFilter) {
this.seriesPath = seriesPath;
this.dataType = dataType;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 130dd6d..a8afa57 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -239,10 +239,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
for (long statementId : statementIds) {
Set<Long> queryIds = statementId2QueryId.getOrDefault(statementId,
Collections.emptySet());
for (long queryId : queryIds) {
- queryId2DataSet.remove(queryId);
-
try {
- QueryResourceManager.getInstance().endQuery(queryId);
+ releaseQueryResource(queryId);
} catch (StorageEngineException e) {
// release as many as resources as possible, so do not break as soon
as one exception is
// raised
@@ -300,7 +298,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
/**
* release single operation resource
*/
- private void releaseQueryResource(long queryId) throws
StorageEngineException {
+ protected void releaseQueryResource(long queryId) throws
StorageEngineException {
// remove the corresponding Physical Plan
queryId2DataSet.remove(queryId);
QueryResourceManager.getInstance().endQuery(queryId);
@@ -324,8 +322,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
break;
case "COLUMN":
- List<TSDataType> dataTypes = SchemaUtils
-
.getSeriesTypesByString(Collections.singletonList(req.getColumnPath()), null);
+ List<TSDataType> dataTypes =
+
getSeriesTypesByString(Collections.singletonList(req.getColumnPath()), null);
resp.setDataType(dataTypes.get(0).toString());
status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
break;
@@ -753,7 +751,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
for (Path p : paths) {
respColumns.add(p.getFullPath());
}
- seriesTypes = SchemaUtils.getSeriesTypesByString(respColumns, null);
+ seriesTypes = getSeriesTypesByString(respColumns, null);
break;
case AGGREGATION:
case GROUPBY:
@@ -766,7 +764,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
for (int i = 0; i < paths.size(); i++) {
respColumns.add(aggregations.get(i) + "(" +
paths.get(i).getFullPath() + ")");
}
- seriesTypes = SchemaUtils.getSeriesTypesByPath(paths, aggregations);
+ seriesTypes = getSeriesTypesByPath(paths, aggregations);
break;
default:
throw new TException("unsupported query type: " +
plan.getOperatorType());
@@ -786,6 +784,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of
ALIGN_BY_DEVICE result
List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of
ALIGN_BY_DEVICE result
+
Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
Map<String, TSDataType> checker = plan.getMeasurementDataTypeMap();
@@ -820,7 +819,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
plan.setPaths(null);
}
-
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
try {
@@ -839,8 +837,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
fillRpcReturnData(req.fetchSize, queryDataSet,
sessionIdUsernameMap.get(req.sessionId));
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
- QueryResourceManager.getInstance().endQuery(req.queryId);
- queryId2DataSet.remove(req.queryId);
+ releaseQueryResource(req.queryId);
}
TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
resp.setHasResultSet(hasResultSet);
@@ -955,7 +952,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return queryDataSet;
}
- private QueryContext genQueryContext(long queryId) {
+ protected QueryContext genQueryContext(long queryId) {
return new QueryContext(queryId);
}
@@ -1032,7 +1029,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return AuthorityChecker.check(username, paths, plan.getOperatorType(),
targetUser);
}
- void handleClientExit() {
+ protected void handleClientExit() {
Long sessionId = currSessionId.get();
if (sessionId != null) {
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
@@ -1312,7 +1309,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return null;
}
- private TSStatus executePlan(PhysicalPlan plan) {
+ protected TSStatus executePlan(PhysicalPlan plan) {
boolean execRet;
try {
execRet = executeNonQuery(plan);
@@ -1329,4 +1326,14 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
private long generateQueryId(boolean isDataQuery) {
return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
}
+
+ protected List<TSDataType> getSeriesTypesByPath(List<Path> paths,
List<String> aggregations)
+ throws MetadataException {
+ return SchemaUtils.getSeriesTypesByPath(paths, aggregations);
+ }
+
+ protected List<TSDataType> getSeriesTypesByString(List<String> paths, String
aggregation)
+ throws MetadataException {
+ return SchemaUtils.getSeriesTypesByString(paths, aggregation);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 4109382..d89e6fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,9 +19,12 @@
package org.apache.iotdb.db.utils;
import java.io.File;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
public class FilePathUtils {
+ private static final String PATH_SPLIT_STRING = File.separator.equals("\\")
? "\\\\" : "/";
+
private FilePathUtils() {
// forbidding instantiation
}
@@ -39,4 +42,7 @@ public class FilePathUtils {
return filePath;
}
+ public static String[] splitTsFilePath(TsFileResource resource) {
+ return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index f976d0f..bebae01 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -33,6 +33,9 @@ public abstract class QueryDataSet {
protected int rowOffset = 0;
protected int alreadyReturnedRowNum = 0;
+ public QueryDataSet() {
+ }
+
public QueryDataSet(List<Path> paths, List<TSDataType> dataTypes) {
this.paths = paths;
this.dataTypes = dataTypes;