This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cherry_pick_cluster_2 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0a45c19c8f07ca8014ef7d7c4a37fd9956b1de32 Author: jt2594838 <[email protected]> AuthorDate: Wed Mar 11 17:40:39 2020 +0800 cheery pick changes from cluster_new: 1. getAllClosedStorageGroupTsFile is now grouped by partition 2. fix empty AggregationResult is not correctly serialized 3. fix two empty AvgAggrResult merge to a wrong result 4. fix reset in First/LastValue 5. change member protection levels 6. extract GroupByExecutor and LocalGroupByExecutor 7. extract getters of readers and datasets 8. extract fill initialization --- .../org/apache/iotdb/db/engine/StorageEngine.java | 20 ++- .../org/apache/iotdb/db/metadata/MManager.java | 2 +- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 1 - .../qp/strategy/optimizer/ConcatPathOptimizer.java | 2 +- .../db/query/aggregation/AggregateResult.java | 101 ++++++----- .../db/query/aggregation/impl/AvgAggrResult.java | 9 + .../aggregation/impl/FirstValueAggrResult.java | 6 + .../aggregation/impl/LastValueAggrResult.java | 6 + .../dataset/groupby/GroupByEngineDataSet.java | 13 +- .../db/query/dataset/groupby/GroupByExecutor.java | 15 ++ .../groupby/GroupByWithValueFilterDataSet.java | 26 ++- .../groupby/GroupByWithoutValueFilterDataSet.java | 185 ++------------------ .../dataset/groupby/LocalGroupByExecutor.java | 187 +++++++++++++++++++++ .../db/query/executor/AggregationExecutor.java | 16 +- .../iotdb/db/query/executor/FillQueryExecutor.java | 11 +- .../iotdb/db/query/executor/QueryRouter.java | 23 ++- .../java/org/apache/iotdb/db/query/fill/IFill.java | 8 + .../iotdb/db/query/reader/series/SeriesReader.java | 4 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 17 +- .../org/apache/iotdb/db/utils/FilePathUtils.java | 6 + .../iotdb/db/qp/plan/ConcatOptimizerTest.java | 22 +++ .../tsfile/read/query/dataset/QueryDataSet.java | 3 + 22 files changed, 420 insertions(+), 263 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 9680f39..a4923ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -529,15 +529,23 @@ public class StorageEngine implements IService { /** * - * @return TsFiles (seq or unseq) grouped by their storage group. + * @return TsFiles (seq or unseq) grouped by their storage group and partition number. */ - public Map<String, List<TsFileResource>> getAllClosedStorageGroupTsFile() { - Map<String, List<TsFileResource>> ret = new HashMap<>(); + public Map<String, Map<Integer, List<TsFileResource>>> getAllClosedStorageGroupTsFile() { + Map<String, Map<Integer, List<TsFileResource>>> ret = new HashMap<>(); for (Entry<String, StorageGroupProcessor> entry : processorMap .entrySet()) { - ret.computeIfAbsent(entry.getKey(), sg -> new ArrayList<>()).addAll(entry.getValue().getSequenceFileTreeSet()); - ret.get(entry.getKey()).addAll(entry.getValue().getUnSequenceFileList()); - ret.get(entry.getKey()).removeIf(file -> !file.isClosed()); + List<TsFileResource> sequenceFiles = entry.getValue().getSequenceFileTreeSet(); + for (TsFileResource sequenceFile : sequenceFiles) { + if (!sequenceFile.isClosed()) { + continue; + } + String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile); + int partitionNum = Integer.parseInt(fileSplits[fileSplits.length - 2]); + Map<Integer, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey() + ,n -> new HashMap<>()); + storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile); + } } return ret; } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 00b21cc..f5a20ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -885,7 +885,7 @@ public class MManager { * and the wildcard will be removed. * If the wildcard is at the tail, then the inference will go on until the storage groups are found * and the wildcard will be kept. - * (2) Suppose the part of the path is a substring that begin after the storage group name. (e.g., + * (2) Suppose the path of the path is a substring that begin after the storage group name. (e.g., * For "root.*.sg1.a.*.b.*" and "root.x.sg1" is a storage group, then this part is "a.*.b.*"). * For this part, keep what it is. * diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 45c53a7..4e06220 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -299,7 +299,6 @@ public class PhysicalGenerator { try { // remove stars in SELECT to get actual paths List<String> actualPaths = getMatchedTimeseries(fullPath.getFullPath()); - // for actual non exist path if (actualPaths.isEmpty() && originAggregations.isEmpty()) { String nonExistMeasurement = fullPath.getMeasurement(); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java index 32a9a50..0cf280f 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java @@ -119,7 +119,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { if(!isAlignByDevice){ sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter, filterPaths)); } - filter.setPathSet(filterPaths); + sfwOperator.getFilterOperator().setPathSet(filterPaths); // GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to optimize filter without prefix first return sfwOperator; diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java index ee083ee..ad95518 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Objects; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.factory.AggregateResultFactory; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -44,7 +45,7 @@ public abstract class AggregateResult { private double doubleValue; private Binary binaryValue; - private boolean hasResult; + protected boolean hasResult; /** * construct. @@ -110,29 +111,32 @@ public abstract class AggregateResult { TSDataType dataType = TSDataType.deserialize(buffer.getShort()); AggregateResult aggregateResult = AggregateResultFactory .getAggrResultByType(aggregationType, dataType); - switch (dataType) { - case BOOLEAN: - aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer)); - break; - case INT32: - aggregateResult.setIntValue(buffer.getInt()); - break; - case INT64: - aggregateResult.setLongValue(buffer.getLong()); - break; - case FLOAT: - aggregateResult.setFloatValue(buffer.getFloat()); - break; - case DOUBLE: - aggregateResult.setDoubleValue(buffer.getDouble()); - break; - case TEXT: - aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer)); - break; - default: - throw new IllegalArgumentException("Invalid Aggregation Type: " + dataType.name()); + boolean hasResult = ReadWriteIOUtils.readBool(buffer); + if (hasResult) { + switch (dataType) { + case BOOLEAN: + aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer)); + break; + case INT32: + aggregateResult.setIntValue(buffer.getInt()); + break; + case INT64: + aggregateResult.setLongValue(buffer.getLong()); + break; + case FLOAT: + aggregateResult.setFloatValue(buffer.getFloat()); + break; + case DOUBLE: + aggregateResult.setDoubleValue(buffer.getDouble()); + break; + case TEXT: + aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer)); + break; + default: + throw new IllegalArgumentException("Invalid Aggregation Type: " + dataType.name()); + } + aggregateResult.deserializeSpecificFields(buffer); } - aggregateResult.deserializeSpecificFields(buffer); return aggregateResult; } @@ -141,29 +145,32 @@ public abstract class AggregateResult { public void serializeTo(OutputStream outputStream) throws IOException { aggregationType.serializeTo(outputStream); ReadWriteIOUtils.write(resultDataType, outputStream); - switch (resultDataType) { - case BOOLEAN: - ReadWriteIOUtils.write(booleanValue, outputStream); - break; - case INT32: - ReadWriteIOUtils.write(intValue, outputStream); - break; - case INT64: - ReadWriteIOUtils.write(longValue, outputStream); - break; - case FLOAT: - ReadWriteIOUtils.write(floatValue, outputStream); - break; - case DOUBLE: - ReadWriteIOUtils.write(doubleValue, outputStream); - break; - case TEXT: - ReadWriteIOUtils.write(binaryValue, outputStream); - break; - default: - throw new IllegalArgumentException("Invalid Aggregation Type: " + resultDataType.name()); + ReadWriteIOUtils.write(hasResult(), outputStream); + if (hasResult()) { + switch (resultDataType) { + case BOOLEAN: + ReadWriteIOUtils.write(booleanValue, outputStream); + break; + case INT32: + ReadWriteIOUtils.write(intValue, outputStream); + break; + case INT64: + ReadWriteIOUtils.write(longValue, outputStream); + break; + case FLOAT: + ReadWriteIOUtils.write(floatValue, outputStream); + break; + case DOUBLE: + ReadWriteIOUtils.write(doubleValue, outputStream); + break; + case TEXT: + ReadWriteIOUtils.write(binaryValue, outputStream); + break; + default: + throw new IllegalArgumentException("Invalid Aggregation Type: " + resultDataType.name()); + } + serializeSpecificFields(outputStream); } - serializeSpecificFields(outputStream); } protected abstract void serializeSpecificFields(OutputStream outputStream) throws IOException; @@ -294,4 +301,8 @@ public abstract class AggregateResult { public String toString() { return String.valueOf(getResult()); } + + public AggregationType getAggregationType() { + return aggregationType; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java index 1e44444..e93d0e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java @@ -45,6 +45,11 @@ public class AvgAggrResult extends AggregateResult { } @Override + protected boolean hasResult() { + return cnt > 0; + } + + @Override public Double getResult() { if (cnt > 0) { setDoubleValue(avg); @@ -120,6 +125,10 @@ public class AvgAggrResult extends AggregateResult { @Override public void merge(AggregateResult another) { AvgAggrResult anotherAvg = (AvgAggrResult) another; + if (anotherAvg.cnt == 0) { + // avoid two empty results producing an NaN + return; + } avg = avg * ((double) cnt / (cnt + anotherAvg.cnt)) + anotherAvg.avg * ((double) anotherAvg.cnt / (cnt + anotherAvg.cnt)); cnt += anotherAvg.cnt; diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java index 52da9c0..2dc3e2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java @@ -42,6 +42,12 @@ public class FirstValueAggrResult extends AggregateResult { } @Override + public void reset() { + super.reset(); + timestamp = Long.MAX_VALUE; + } + + @Override public Object getResult() { return hasResult() ? getValue() : null; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java index 2077af4..13a6a67 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java @@ -41,6 +41,12 @@ public class LastValueAggrResult extends AggregateResult { } @Override + public void reset() { + super.reset(); + timestamp = Long.MIN_VALUE; + } + + @Override public Object getResult() { return hasResult() ? getValue() : null; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java index 94d290e..4ca7ceb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java @@ -27,18 +27,21 @@ import org.apache.iotdb.tsfile.utils.Pair; public abstract class GroupByEngineDataSet extends QueryDataSet { protected long queryId; - private long interval; - private long slidingStep; + protected long interval; + protected long slidingStep; // total query [startTime, endTime) - private long startTime; - private long endTime; + protected long startTime; + protected long endTime; // current interval [curStartTime, curEndTime) protected long curStartTime; protected long curEndTime; - private int usedIndex; + protected int usedIndex; protected boolean hasCachedTimeInterval; + public GroupByEngineDataSet() { + } + /** * groupBy query. */ diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java new file mode 100644 index 0000000..ced8008 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java @@ -0,0 +1,15 @@ +package org.apache.iotdb.db.query.dataset.groupby; + +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.tsfile.utils.Pair; + +public interface GroupByExecutor { + void addAggregateResult(AggregateResult aggrResult, int index); + + void resetAggregateResults(); + + List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long curEndTime) throws IOException, QueryProcessException; +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java index 44402cc..a951001 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java @@ -29,11 +29,14 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.factory.AggregateResultFactory; +import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { @@ -53,7 +56,10 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { /** * group by batch calculation size. */ - private int timeStampFetchSize; + protected int timeStampFetchSize; + + public GroupByWithValueFilterDataSet() { + } /** * constructor. @@ -74,18 +80,28 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { /** * init reader and aggregate function. */ - private void initGroupBy(QueryContext context, GroupByPlan groupByPlan) + protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan) throws StorageEngineException { - this.timestampGenerator = new ServerTimeGenerator(groupByPlan.getExpression(), context); + this.timestampGenerator = getTimeGenerator(groupByPlan.getExpression(), context); this.allDataReaderList = new ArrayList<>(); this.groupByPlan = groupByPlan; for (int i = 0; i < paths.size(); i++) { Path path = paths.get(i); - allDataReaderList.add(new SeriesReaderByTimestamp(path, dataTypes.get(i), context, - QueryResourceManager.getInstance().getQueryDataSource(path, context, null), null)); + allDataReaderList.add(getReaderByTime(path, dataTypes.get(i), context, null)); } } + protected TimeGenerator getTimeGenerator(IExpression expression, QueryContext context) + throws StorageEngineException { + return new ServerTimeGenerator(expression, context); + } + + protected IReaderByTimestamp getReaderByTime(Path path, + TSDataType dataType, QueryContext context, TsFileFilter fileFilter) throws StorageEngineException { + return new SeriesReaderByTimestamp(path, dataType, context, + QueryResourceManager.getInstance().getQueryDataSource(path, context, null), fileFilter); + } + @Override protected RowRecord nextWithoutConstraint() throws IOException { if (!hasCachedTimeInterval) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 023cbf3..82c478a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -20,27 +20,20 @@ package org.apache.iotdb.db.query.dataset.groupby; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.factory.AggregateResultFactory; -import org.apache.iotdb.db.query.reader.series.IAggregateReader; -import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; +import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.common.RowRecord; -import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -54,7 +47,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { .getLogger(GroupByWithoutValueFilterDataSet.class); private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>(); - private TimeRange timeRange; + + public GroupByWithoutValueFilterDataSet() { + } /** * constructor. @@ -66,7 +61,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { initGroupBy(context, groupByPlan); } - private void initGroupBy(QueryContext context, GroupByPlan groupByPlan) + protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan) throws StorageEngineException { IExpression expression = groupByPlan.getExpression(); @@ -80,7 +75,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { if (!pathExecutors.containsKey(path)) { //init GroupByExecutor pathExecutors.put(path, - new GroupByExecutor(path, dataTypes.get(i), context, timeFilter)); + getGroupByExecutor(path, dataTypes.get(i), context, timeFilter, null)); } AggregateResult aggrResult = AggregateResultFactory .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i), @@ -97,7 +92,6 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } hasCachedTimeInterval = false; RowRecord record = new RowRecord(curStartTime); - timeRange = new TimeRange(curStartTime, curEndTime - 1); AggregateResult[] fields = new AggregateResult[paths.size()]; @@ -105,7 +99,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { for (Entry<Path, GroupByExecutor> pathGroupByExecutorEntry : pathExecutors.entrySet()) { GroupByExecutor executor = pathGroupByExecutorEntry.getValue(); executor.resetAggregateResults(); - List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult(); + List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult(curStartTime, curEndTime); for (Pair<AggregateResult, Integer> aggregation : aggregations) { fields[aggregation.right] = aggregation.left; } @@ -125,165 +119,10 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { return record; } - private class GroupByExecutor { - - private IAggregateReader reader; - private BatchData preCachedData; - //<aggFunction - indexForRecord> of path - private List<Pair<AggregateResult, Integer>> results = new ArrayList<>(); - - GroupByExecutor(Path path, TSDataType dataType, QueryContext context, Filter timeFilter) - throws StorageEngineException { - QueryDataSource queryDataSource = QueryResourceManager.getInstance() - .getQueryDataSource(path, context, timeFilter); - // update filter by TTL - timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - this.reader = new SeriesAggregateReader(path, dataType, context, queryDataSource, timeFilter, - null, null); - this.preCachedData = null; - } - - private List<Pair<AggregateResult, Integer>> calcResult() - throws IOException, QueryProcessException { - if (calcFromCacheData()) { - return results; - } - - //read page data firstly - if (readAndCalcFromPage()) { - return results; - } - - //read chunk finally - while (reader.hasNextChunk()) { - Statistics chunkStatistics = reader.currentChunkStatistics(); - if (chunkStatistics.getStartTime() >= curEndTime) { - return results; - } - //calc from chunkMetaData - if (reader.canUseCurrentChunkStatistics() && timeRange.contains( - new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) { - calcFromStatistics(chunkStatistics); - reader.skipCurrentChunk(); - if(isEndCalc()){ - return results; - } - continue; - } - if (readAndCalcFromPage()) { - return results; - } - } - return results; - } - - private void addAggregateResult(AggregateResult aggrResult, int index) { - results.add(new Pair<>(aggrResult, index)); - } - - private boolean isEndCalc() { - for (Pair<AggregateResult, Integer> result : results) { - if (!result.left.isCalculatedAggregationResult()) { - return false; - } - } - return true; - } - - private boolean calcFromCacheData() throws IOException { - calcFromBatch(preCachedData); - // The result is calculated from the cache - return (preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime) - || isEndCalc(); - } - - private void calcFromBatch(BatchData batchData) throws IOException { - // is error data - if (batchData == null - || !batchData.hasCurrent() - || batchData.getMaxTimestamp() < curStartTime - || batchData.currentTime() >= curEndTime) { - return; - } - - for (Pair<AggregateResult, Integer> result : results) { - //current agg method has been calculated - if (result.left.isCalculatedAggregationResult()) { - continue; - } - //lazy reset batch data for calculation - batchData.resetBatchData(); - //skip points that cannot be calculated - while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) { - batchData.next(); - } - if (batchData.hasCurrent()) { - result.left.updateResultFromPageData(batchData, curEndTime); - } - } - //can calc for next interval - if (batchData.getMaxTimestamp() >= curEndTime) { - preCachedData = batchData; - } - } - - private void calcFromStatistics(Statistics statistics) - throws QueryProcessException { - for (Pair<AggregateResult, Integer> result : results) { - //cacl is compile - if (result.left.isCalculatedAggregationResult()) { - continue; - } - result.left.updateResultFromStatistics(statistics); - } - } - - // clear all results - private void resetAggregateResults() { - for (Pair<AggregateResult, Integer> result : results) { - result.left.reset(); - } - } - - - private boolean readAndCalcFromPage() throws IOException, QueryProcessException { - while (reader.hasNextPage()) { - Statistics pageStatistics = reader.currentPageStatistics(); - //must be non overlapped page - if (pageStatistics != null) { - //current page max than time range - if (pageStatistics.getStartTime() >= curEndTime) { - return true; - } - //can use pageHeader - if (reader.canUseCurrentPageStatistics() && timeRange.contains( - new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) { - calcFromStatistics(pageStatistics); - reader.skipCurrentPage(); - if (isEndCalc()) { - return true; - } - continue; - } - } - // calc from page data - BatchData batchData = reader.nextPage(); - if (batchData == null || !batchData.hasCurrent()) { - continue; - } - // stop calc and cached current batchData - if (batchData.currentTime() >= curEndTime) { - preCachedData = batchData; - return true; - } - - calcFromBatch(batchData); - if (isEndCalc() || batchData.currentTime() >= curEndTime) { - return true; - } - } - return false; - } + protected GroupByExecutor getGroupByExecutor(Path path, + TSDataType dataType, + QueryContext context, Filter timeFilter, TsFileFilter fileFilter) + throws StorageEngineException { + return new LocalGroupByExecutor(path, dataType, context, timeFilter, fileFilter); } - } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java new file mode 100644 index 0000000..a009266 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java @@ -0,0 +1,187 @@ +package org.apache.iotdb.db.query.dataset.groupby; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.IAggregateReader; +import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.utils.Pair; + +public class LocalGroupByExecutor implements GroupByExecutor { + + private IAggregateReader reader; + private BatchData preCachedData; + //<aggFunction - indexForRecord> of path + private List<Pair<AggregateResult, Integer>> results = new ArrayList<>(); + private TimeRange timeRange; + + public LocalGroupByExecutor(Path path, TSDataType dataType, QueryContext context, Filter timeFilter, + TsFileFilter fileFilter) + throws StorageEngineException { + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(path, context, timeFilter); + // update filter by TTL + timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + this.reader = new SeriesAggregateReader(path, dataType, context, queryDataSource, timeFilter, + null, fileFilter); + this.preCachedData = null; + timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE); + } + + @Override + public void addAggregateResult(AggregateResult aggrResult, int index) { + results.add(new Pair<>(aggrResult, index)); + } + + private boolean isEndCalc() { + for (Pair<AggregateResult, Integer> result : results) { + if (!result.left.isCalculatedAggregationResult()) { + return false; + } + } + return true; + } + + private boolean calcFromCacheData(long curStartTime, long curEndTime) throws IOException { + calcFromBatch(preCachedData, curStartTime, curEndTime); + // The result is calculated from the cache + return (preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime) + || isEndCalc(); + } + + private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTime) throws IOException { + // is error data + if (batchData == null + || !batchData.hasCurrent() + || batchData.getMaxTimestamp() < curStartTime + || batchData.currentTime() >= curEndTime) { + return; + } + + for (Pair<AggregateResult, Integer> result : results) { + //current agg method has been calculated + if (result.left.isCalculatedAggregationResult()) { + continue; + } + //lazy reset batch data for calculation + batchData.resetBatchData(); + //skip points that cannot be calculated + while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) { + batchData.next(); + } + if (batchData.hasCurrent()) { + result.left.updateResultFromPageData(batchData, curEndTime); + } + } + //can calc for next interval + if (batchData.getMaxTimestamp() >= curEndTime) { + preCachedData = batchData; + } + } + + private void calcFromStatistics(Statistics pageStatistics) + throws QueryProcessException { + for (Pair<AggregateResult, Integer> result : results) { + //cacl is compile + if (result.left.isCalculatedAggregationResult()) { + continue; + } + result.left.updateResultFromStatistics(pageStatistics); + } + } + + @Override + public List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + timeRange.set(curStartTime, curEndTime - 1); + if (calcFromCacheData(curStartTime, curEndTime)) { + return results; + } + + //read page data firstly + if (readAndCalcFromPage(curStartTime, curEndTime)) { + return results; + } + + //read chunk finally + while (reader.hasNextChunk()) { + Statistics chunkStatistics = reader.currentChunkStatistics(); + if (chunkStatistics.getStartTime() >= curEndTime) { + return results; + } + //calc from chunkMetaData + if (reader.canUseCurrentChunkStatistics() + && timeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) { + calcFromStatistics(chunkStatistics); + reader.skipCurrentChunk(); + continue; + } + if (readAndCalcFromPage(curStartTime, curEndTime)) { + return results; + } + } + return results; + } + + // clear all results + @Override + public void resetAggregateResults() { + for (Pair<AggregateResult, Integer> result : results) { + result.left.reset(); + } + } + + + private boolean readAndCalcFromPage(long curStartTime, long curEndTime) throws IOException, + QueryProcessException { + while (reader.hasNextPage()) { + Statistics pageStatistics = reader.currentPageStatistics(); + //must be non overlapped page + if (pageStatistics != null) { + //current page max than time range + if (pageStatistics.getStartTime() >= curEndTime) { + return true; + } + //can use pageHeader + if (reader.canUseCurrentPageStatistics() + && timeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) { + calcFromStatistics(pageStatistics); + reader.skipCurrentPage(); + if (isEndCalc()) { + return true; + } + continue; + } + } + // calc from page data + BatchData batchData = reader.nextPage(); + if (batchData == null || !batchData.hasCurrent()) { + continue; + } + // stop calc and cached current batchData + if (batchData.currentTime() >= curEndTime) { + preCachedData = batchData; + return true; + } + + calcFromBatch(batchData, curStartTime, curEndTime); + if (isEndCalc() || batchData.currentTime() >= curEndTime) { + return true; + } + } + return false; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 6882e7b..1e63f5c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -57,7 +57,7 @@ public class AggregationExecutor { private List<Path> selectedSeries; protected List<TSDataType> dataTypes; - private List<String> aggregations; + protected List<String> aggregations; protected IExpression expression; /** @@ -65,7 +65,7 @@ public class AggregationExecutor { **/ private int aggregateFetchSize; - AggregationExecutor(AggregationPlan aggregationPlan) { + protected AggregationExecutor(AggregationPlan aggregationPlan) { this.selectedSeries = aggregationPlan.getDeduplicatedPaths(); this.dataTypes = aggregationPlan.getDeduplicatedDataTypes(); this.aggregations = aggregationPlan.getDeduplicatedAggregations(); @@ -78,7 +78,7 @@ public class AggregationExecutor { * * @param context query context */ - QueryDataSet executeWithoutValueFilter(QueryContext context) + public QueryDataSet executeWithoutValueFilter(QueryContext context) throws StorageEngineException, IOException, QueryProcessException { Filter timeFilter = null; @@ -109,7 +109,7 @@ public class AggregationExecutor { * @param context query context * @return AggregateResult list */ - private List<AggregateResult> aggregateOneSeries( + protected List<AggregateResult> aggregateOneSeries( Map.Entry<Path, List<Integer>> pathToAggrIndexes, Filter timeFilter, QueryContext context) throws IOException, QueryProcessException, StorageEngineException { @@ -128,7 +128,7 @@ public class AggregationExecutor { return aggregateResultList; } - private static void aggregateOneSeries(Path seriesPath, QueryContext context, Filter timeFilter, + public static void aggregateOneSeries(Path seriesPath, QueryContext context, Filter timeFilter, TSDataType tsDataType, List<AggregateResult> aggregateResultList, TsFileFilter fileFilter) throws StorageEngineException, IOException, QueryProcessException { @@ -227,7 +227,7 @@ public class AggregationExecutor { * * @param context query context. */ - QueryDataSet executeWithValueFilter(QueryContext context) + public QueryDataSet executeWithValueFilter(QueryContext context) throws StorageEngineException, IOException { TimeGenerator timestampGenerator = getTimeGenerator(context); @@ -249,11 +249,11 @@ public class AggregationExecutor { return constructDataSet(aggregateResults); } - private TimeGenerator getTimeGenerator(QueryContext context) throws StorageEngineException { + protected TimeGenerator getTimeGenerator(QueryContext context) throws StorageEngineException { return new ServerTimeGenerator(expression, context); } - private IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType, + protected IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType, QueryContext context) throws StorageEngineException { return new SeriesReaderByTimestamp(path, dataType, context, diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java index a14742d..08cdde1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java @@ -88,9 +88,7 @@ public class FillQueryExecutor { } else { fill = typeIFillMap.get(dataType).copy(); } - fill.setDataType(dataType); - fill.setQueryTime(queryTime); - fill.constructReaders(path, context); + configureFill(fill, dataType, path, context, queryTime); TimeValuePair timeValuePair = fill.getFillResult(); if (timeValuePair == null || timeValuePair.getValue() == null) { @@ -104,4 +102,11 @@ public class FillQueryExecutor { dataSet.setRecord(record); return dataSet; } + + protected void configureFill(IFill fill, TSDataType dataType, Path path, QueryContext context, + long queryTime) throws StorageEngineException { + fill.setDataType(dataType); + fill.setQueryTime(queryTime); + fill.constructReaders(path, context); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 9807933..6ed625e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -138,12 +138,22 @@ public class QueryRouter implements IQueryRouter { groupByPlan.setExpression(optimizedExpression); if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { - return new GroupByWithoutValueFilterDataSet(context, groupByPlan); + return getGroupByWithoutValueFilterDataSet(context, groupByPlan); } else { - return new GroupByWithValueFilterDataSet(context, groupByPlan); + return getGroupByWithValueFilterDataSet(context, groupByPlan); } } + protected GroupByWithoutValueFilterDataSet getGroupByWithoutValueFilterDataSet(QueryContext context, GroupByPlan plan) + throws StorageEngineException { + return new GroupByWithoutValueFilterDataSet(context, plan); + } + + protected GroupByWithValueFilterDataSet getGroupByWithValueFilterDataSet(QueryContext context, GroupByPlan plan) + throws StorageEngineException { + return new GroupByWithValueFilterDataSet(context, plan); + } + @Override public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context) throws StorageEngineException, QueryProcessException, IOException { @@ -152,11 +162,18 @@ public class QueryRouter implements IQueryRouter { long queryTime = fillQueryPlan.getQueryTime(); Map<TSDataType, IFill> fillType = fillQueryPlan.getFillType(); - FillQueryExecutor fillQueryExecutor = new FillQueryExecutor(fillPaths, dataTypes, queryTime, + FillQueryExecutor fillQueryExecutor = getFillExecutor(fillPaths, dataTypes, queryTime, fillType); return fillQueryExecutor.execute(context); } + protected FillQueryExecutor getFillExecutor( + List<Path> fillPaths, + List<TSDataType> dataTypes, long queryTime, + Map<TSDataType, IFill> fillType) { + return new FillQueryExecutor(fillPaths, dataTypes, queryTime, fillType); + } + @Override public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context) throws StorageEngineException, QueryProcessException, IOException { diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java index f1d9a21..d8eb77b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java +++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java @@ -57,6 +57,14 @@ public abstract class IFill { timeFilter, null, null); } + public void setAllDataReader(IBatchReader allDataReader) { + this.allDataReader = allDataReader; + } + + public Filter getFilter() { + return constructFilter(); + } + public abstract TimeValuePair getFillResult() throws IOException, UnSupportedFillTypeException; public TSDataType getDataType() { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 1d7825d..ef27d37 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import java.io.IOException; import java.util.*; -class SeriesReader { +public class SeriesReader { private final Path seriesPath; private final TSDataType dataType; @@ -94,7 +94,7 @@ class SeriesReader { private boolean hasCachedNextOverlappedPage; private BatchData cachedBatchData; - SeriesReader(Path seriesPath, TSDataType dataType, QueryContext context, + public SeriesReader(Path seriesPath, TSDataType dataType, QueryContext context, QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, TsFileFilter fileFilter) { this.seriesPath = seriesPath; this.dataType = dataType; diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3ad965c..803bdf5 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -239,10 +239,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (long statementId : statementIds) { Set<Long> queryIds = statementId2QueryId.getOrDefault(statementId, Collections.emptySet()); for (long queryId : queryIds) { - queryId2DataSet.remove(queryId); - try { - QueryResourceManager.getInstance().endQuery(queryId); + releaseQueryResource(queryId); } catch (StorageEngineException e) { // release as many as resources as possible, so do not break as soon as one exception is // raised @@ -300,7 +298,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { /** * release single operation resource */ - private void releaseQueryResource(long queryId) throws StorageEngineException { + protected void releaseQueryResource(long queryId) throws StorageEngineException { // remove the corresponding Physical Plan queryId2DataSet.remove(queryId); QueryResourceManager.getInstance().endQuery(queryId); @@ -778,6 +776,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result List<TSDataType> deduplicatedColumnsType = new ArrayList<>(); deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result + Set<String> deduplicatedMeasurements = new LinkedHashSet<>(); Map<String, TSDataType> checker = plan.getMeasurementDataTypeMap(); @@ -812,7 +811,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { plan.setPaths(null); } - @Override public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { try { @@ -831,8 +829,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId)); boolean hasResultSet = result.bufferForTime().limit() != 0; if (!hasResultSet) { - QueryResourceManager.getInstance().endQuery(req.queryId); - queryId2DataSet.remove(req.queryId); + releaseQueryResource(req.queryId); } TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); resp.setHasResultSet(hasResultSet); @@ -942,7 +939,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return queryDataSet; } - private QueryContext genQueryContext(long queryId) { + protected QueryContext genQueryContext(long queryId) { return new QueryContext(queryId); } @@ -1019,7 +1016,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser); } - void handleClientExit() { + protected void handleClientExit() { Long sessionId = currSessionId.get(); if (sessionId != null) { TSCloseSessionReq req = new TSCloseSessionReq(sessionId); @@ -1299,7 +1296,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return null; } - private TSStatus executePlan(PhysicalPlan plan) { + protected TSStatus executePlan(PhysicalPlan plan) { boolean execRet; try { execRet = executeNonQuery(plan); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java index 4109382..d89e6fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java @@ -19,9 +19,12 @@ package org.apache.iotdb.db.utils; import java.io.File; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; public class FilePathUtils { + private static final String PATH_SPLIT_STRING = File.separator.equals("\\") ? "\\\\" : "/"; + private FilePathUtils() { // forbidding instantiation } @@ -39,4 +42,7 @@ public class FilePathUtils { return filePath; } + public static String[] splitTsFilePath(TsFileResource resource) { + return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING); + } } diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java index 015a094..ffa3526 100644 --- a/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java @@ -34,6 +34,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.junit.After; @@ -97,4 +99,24 @@ public class ConcatOptimizerTest { ValueFilter.lt(10)); assertEquals(seriesExpression.toString(), ((RawDataQueryPlan) plan).getExpression().toString()); } + + @Test + public void testConcatMultipleDeviceInFilter() throws QueryProcessException { + String inputSQL = "select s1 from root.laptop.* where s1 < 10"; + PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL); + IExpression expression = BinaryExpression.and( + BinaryExpression.and( + new SingleSeriesExpression( + new Path("root.laptop.d1.s1"), + ValueFilter.lt(10)), + new SingleSeriesExpression( + new Path("root.laptop.d2.s1"), + ValueFilter.lt(10)) + ), + new SingleSeriesExpression( + new Path("root.laptop.d3.s1"), + ValueFilter.lt(10)) + ); + assertEquals(expression.toString(), ((RawDataQueryPlan) plan).getExpression().toString()); + } } \ No newline at end of file diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java index f976d0f..bebae01 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java @@ -33,6 +33,9 @@ public abstract class QueryDataSet { protected int rowOffset = 0; protected int alreadyReturnedRowNum = 0; + public QueryDataSet() { + } + public QueryDataSet(List<Path> paths, List<TSDataType> dataTypes) { this.paths = paths; this.dataTypes = dataTypes;
