This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 85888c7 [IoTDB-1936] Support other aggregations in group by fill with
value filter (#4375)
85888c7 is described below
commit 85888c7395ee784795e24b45911a978e0ff7a521
Author: CRZbulabula <[email protected]>
AuthorDate: Wed Nov 17 18:58:35 2021 +0800
[IoTDB-1936] Support other aggregations in group by fill with value filter
(#4375)
---
.../iotdb/cluster/query/ClusterQueryRouter.java | 9 +
.../ClusterGroupByFillNoVFilterDataSet.java | 7 +-
....java => ClusterGroupByFillVFilterDataSet.java} | 50 ++-
.../main/java/org/apache/iotdb/SessionExample.java | 28 +-
...rDataSet.java => GroupByFillEngineDataSet.java} | 380 +++++------------
.../groupby/GroupByFillWithValueFilterDataSet.java | 438 ++++++++++++++++++++
.../GroupByFillWithoutValueFilterDataSet.java | 406 ++-----------------
.../groupby/GroupByWithValueFilterDataSet.java | 4 +-
.../iotdb/db/query/executor/QueryRouter.java | 19 +-
.../iotdb/db/integration/IoTDBGroupByFillIT.java | 448 +++++++++++++++++++++
.../db/integration/IoTDBGroupByFillMixPathsIT.java | 191 +++++++++
.../dataset/groupby/GroupByFillDataSetTest.java | 170 ++++----
.../tsfile/read/filter/basic/BinaryFilter.java | 8 +
13 files changed, 1381 insertions(+), 777 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
index 61b3b41..dcfac79 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.query.aggregate.ClusterAggregateExecutor;
import org.apache.iotdb.cluster.query.fill.ClusterFillExecutor;
import
org.apache.iotdb.cluster.query.groupby.ClusterGroupByFillNoVFilterDataSet;
+import org.apache.iotdb.cluster.query.groupby.ClusterGroupByFillVFilterDataSet;
import org.apache.iotdb.cluster.query.groupby.ClusterGroupByNoVFilterDataSet;
import org.apache.iotdb.cluster.query.groupby.ClusterGroupByVFilterDataSet;
import org.apache.iotdb.cluster.query.last.ClusterLastQueryExecutor;
@@ -36,6 +37,7 @@ import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithValueFilterDataSet;
import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithoutValueFilterDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
import
org.apache.iotdb.db.query.dataset.groupby.GroupByWithoutValueFilterDataSet;
@@ -79,6 +81,13 @@ public class ClusterQueryRouter extends QueryRouter {
}
@Override
+ protected GroupByFillWithValueFilterDataSet
getGroupByFillWithValueFilterDataSet(
+ QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan)
+ throws QueryProcessException, StorageEngineException {
+ return new ClusterGroupByFillVFilterDataSet(context, groupByTimeFillPlan,
metaGroupMember);
+ }
+
+ @Override
protected GroupByFillWithoutValueFilterDataSet
getGroupByFillWithoutValueFilterDataSet(
QueryContext context, GroupByTimeFillPlan groupByFillPlan)
throws QueryProcessException, StorageEngineException {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
index fe339f7..ff74c96 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
@@ -19,14 +19,11 @@
package org.apache.iotdb.cluster.query.groupby;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithoutValueFilterDataSet;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -40,8 +37,7 @@ public class ClusterGroupByFillNoVFilterDataSet extends
GroupByFillWithoutValueF
public ClusterGroupByFillNoVFilterDataSet(
QueryContext context,
GroupByTimeFillPlan groupByTimeFillPlan,
- MetaGroupMember metaGroupMember)
- throws QueryProcessException, StorageEngineException {
+ MetaGroupMember metaGroupMember) {
super(context, groupByTimeFillPlan);
this.metaGroupMember = metaGroupMember;
}
@@ -53,7 +49,6 @@ public class ClusterGroupByFillNoVFilterDataSet extends
GroupByFillWithoutValueF
TSDataType dataType,
QueryContext context,
Filter timeFilter,
- TsFileFilter fileFilter,
boolean ascending) {
return new MergeGroupByExecutor(
path, deviceMeasurements, dataType, context, timeFilter,
metaGroupMember, ascending);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillVFilterDataSet.java
similarity index 53%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
copy to
cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillVFilterDataSet.java
index fe339f7..f47ee91 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillVFilterDataSet.java
@@ -18,44 +18,50 @@
*/
package org.apache.iotdb.cluster.query.groupby;
+import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
+import org.apache.iotdb.cluster.query.reader.ClusterTimeGenerator;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
-import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithoutValueFilterDataSet;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
+import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithValueFilterDataSet;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import java.util.Set;
+public class ClusterGroupByFillVFilterDataSet extends
GroupByFillWithValueFilterDataSet {
-/** the cluster implementation of GroupByFillWithoutValueFilterDataSet */
-public class ClusterGroupByFillNoVFilterDataSet extends
GroupByFillWithoutValueFilterDataSet {
+ private final MetaGroupMember metaGroupMember;
+ private final ClusterReaderFactory readerFactory;
- private MetaGroupMember metaGroupMember;
-
- public ClusterGroupByFillNoVFilterDataSet(
+ public ClusterGroupByFillVFilterDataSet(
QueryContext context,
GroupByTimeFillPlan groupByTimeFillPlan,
- MetaGroupMember metaGroupMember)
- throws QueryProcessException, StorageEngineException {
+ MetaGroupMember metaGroupMember) {
super(context, groupByTimeFillPlan);
this.metaGroupMember = metaGroupMember;
+ this.readerFactory = new ClusterReaderFactory(metaGroupMember);
}
@Override
- protected GroupByExecutor getGroupByExecutor(
- PartialPath path,
- Set<String> deviceMeasurements,
- TSDataType dataType,
- QueryContext context,
- Filter timeFilter,
- TsFileFilter fileFilter,
- boolean ascending) {
- return new MergeGroupByExecutor(
- path, deviceMeasurements, dataType, context, timeFilter,
metaGroupMember, ascending);
+ protected TimeGenerator getTimeGenerator(QueryContext context,
RawDataQueryPlan rawDataQueryPlan)
+ throws StorageEngineException {
+ return new ClusterTimeGenerator(context, metaGroupMember,
rawDataQueryPlan, false);
+ }
+
+ @Override
+ protected IReaderByTimestamp getReaderByTime(
+ PartialPath path, RawDataQueryPlan dataQueryPlan, TSDataType dataType,
QueryContext context)
+ throws StorageEngineException, QueryProcessException {
+ return readerFactory.getReaderByTimestamp(
+ path,
+ dataQueryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ context,
+ dataQueryPlan.isAscending(),
+ null);
}
}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 565d706..cab736d 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -75,20 +75,20 @@ public class SessionExample {
createMultiTimeseries();
insertRecord();
insertTablet();
- insertTabletWithNullValues();
- insertTablets();
- insertRecords();
- selectInto();
- createAndDropContinuousQueries();
- nonQuery();
- query();
- queryWithTimeout();
- rawDataQuery();
- lastDataQuery();
- queryByIterator();
- deleteData();
- deleteTimeseries();
- setTimeout();
+ // insertTabletWithNullValues();
+ // insertTablets();
+ // insertRecords();
+ // selectInto();
+ // createAndDropContinuousQueries();
+ // nonQuery();
+ // query();
+ // queryWithTimeout();
+ // rawDataQuery();
+ // lastDataQuery();
+ // queryByIterator();
+ // deleteData();
+ // deleteTimeseries();
+ // setTimeout();
sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
sessionEnableRedirect.setEnableQueryRedirection(true);
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
similarity index 53%
copy from
server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
copy to
server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
index 8a7705e..6202fc9 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.query.dataset.groupby;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
@@ -32,19 +30,13 @@ import org.apache.iotdb.db.query.executor.fill.IFill;
import org.apache.iotdb.db.query.executor.fill.LinearFill;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.query.executor.fill.ValueFill;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -52,75 +44,60 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.stream.Collectors;
-
-public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFilterDataSet {
- private static final Logger logger =
- LoggerFactory.getLogger(GroupByFillWithoutValueFilterDataSet.class);
+public abstract class GroupByFillEngineDataSet extends GroupByEngineDataSet {
- private Map<TSDataType, IFill> fillTypes;
- private final List<PartialPath> deduplicatedPaths;
- private final List<String> aggregations;
- private Map<PartialPath, GroupByExecutor> extraPreviousExecutors = null;
- private Map<PartialPath, GroupByExecutor> extraNextExecutors = null;
+ protected Map<TSDataType, IFill> fillTypes;
+ protected final List<PartialPath> deduplicatedPaths;
+ protected final List<String> aggregations;
+ protected final Map<PartialPath, List<Integer>> resultIndexes = new
HashMap<>();
// the extra previous means first not null value before startTime
// used to fill result before the first not null data
- private Object[] extraPreviousValues;
- private long[] extraPreviousTimes;
+ protected Object[] extraPreviousValues;
+ protected long[] extraPreviousTimes;
// the previous value for each time series, which means
// first not null value GEQ curStartTime in order asc
// second not null value GEQ curStartTime in order desc
- private Object[] previousValues;
- private long[] previousTimes;
-
- // the extra next means first not null value after endTime
- // used to fill result after the last not null data
- private Object[] extraNextValues;
- private long[] extraNextTimes;
+ protected Object[] previousValues;
+ protected long[] previousTimes;
// the next value for each time series, which means
// first not null value LEQ curStartTime in order desc
// second not null value LEQ curStartTime in order asc
- private Object[] nextValues;
- private long[] nextTimes;
+ protected Object[] nextValues;
+ protected long[] nextTimes;
+
+ // the extra next means first not null value after endTime
+ // used to fill result after the last not null data
+ protected Object[] extraNextValues;
+ protected long[] extraNextTimes;
// the result datatype for each time series
- private TSDataType[] resultDataType;
+ protected TSDataType[] resultDataType;
// the next query time range of each path
- private long[] queryStartTimes;
- private long[] queryEndTimes;
- private boolean[] hasCachedQueryInterval;
+ protected long[] queryStartTimes;
+ protected long[] queryEndTimes;
+ protected boolean[] hasCachedQueryInterval;
- public GroupByFillWithoutValueFilterDataSet(
- QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan)
- throws QueryProcessException, StorageEngineException {
+ public GroupByFillEngineDataSet(QueryContext context, GroupByTimeFillPlan
groupByTimeFillPlan) {
super(context, groupByTimeFillPlan);
this.aggregations = groupByTimeFillPlan.getDeduplicatedAggregations();
+ this.fillTypes = groupByTimeFillPlan.getFillType();
+
this.deduplicatedPaths = new ArrayList<>();
- for (Path path : paths) {
- PartialPath partialPath = (PartialPath) path;
- if (!deduplicatedPaths.contains(partialPath)) {
- deduplicatedPaths.add(partialPath);
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = (PartialPath) paths.get(i);
+ if (!deduplicatedPaths.contains(path)) {
+ deduplicatedPaths.add(path);
+ resultIndexes.put(path, new ArrayList<>());
}
+ resultIndexes.get(path).add(i);
}
- }
- public void init(QueryContext context, GroupByTimeFillPlan
groupByTimeFillPlan)
- throws QueryProcessException, StorageEngineException {
- initGroupBy(context, groupByTimeFillPlan);
initArrays();
- initExtraExecutors(context, groupByTimeFillPlan);
- if (extraPreviousExecutors != null) {
- initExtraArrays(extraPreviousValues, extraPreviousTimes, true,
extraPreviousExecutors);
- }
- if (extraNextExecutors != null) {
- initExtraArrays(extraNextValues, extraNextTimes, false,
extraNextExecutors);
- }
- initCachedTimesAndValues();
}
private void initArrays() {
@@ -141,9 +118,9 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
Arrays.fill(nextValues, null);
Arrays.fill(nextTimes, Long.MAX_VALUE);
- queryStartTimes = new long[paths.size()];
- queryEndTimes = new long[paths.size()];
- hasCachedQueryInterval = new boolean[paths.size()];
+ queryStartTimes = new long[deduplicatedPaths.size()];
+ queryEndTimes = new long[deduplicatedPaths.size()];
+ hasCachedQueryInterval = new boolean[deduplicatedPaths.size()];
resultDataType = new TSDataType[aggregations.size()];
Arrays.fill(queryStartTimes, curStartTime);
Arrays.fill(queryEndTimes, curEndTime);
@@ -172,96 +149,20 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
}
}
- private void getGroupByExecutors(
- Map<PartialPath, GroupByExecutor> extraExecutors,
- QueryContext context,
- GroupByTimeFillPlan groupByTimeFillPlan,
- Filter timeFilter,
- boolean isAscending)
- throws StorageEngineException, QueryProcessException {
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
- try {
- // init resultIndexes, group result indexes by path
- for (int i = 0; i < paths.size(); i++) {
- PartialPath path = (PartialPath) paths.get(i);
- if (!extraExecutors.containsKey(path)) {
- // init GroupByExecutor
- extraExecutors.put(
- path,
- getGroupByExecutor(
- path,
-
groupByTimeFillPlan.getAllMeasurementsInDevice(path.getDevice()),
- dataTypes.get(i),
- context,
- timeFilter.copy(),
- null,
- isAscending));
- }
- AggregateResult aggrResult =
- AggregateResultFactory.getAggrResultByName(
- groupByTimeFillPlan.getDeduplicatedAggregations().get(i),
- dataTypes.get(i),
- ascending);
- extraExecutors.get(path).addAggregateResult(aggrResult);
+ /* Cache the previous and next query data before group by fill query */
+ protected void initCachedTimesAndValues() throws QueryProcessException {
+ for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) {
+ try {
+ pathSlide(pathId);
+ pathSlide(pathId);
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
}
- } finally {
- StorageEngine.getInstance().mergeUnLock(list);
- }
- }
-
- /* Init extra path executors to query data outside the original group by
query */
- private void initExtraExecutors(QueryContext context, GroupByTimeFillPlan
groupByTimeFillPlan)
- throws StorageEngineException, QueryProcessException {
- long minQueryStartTime = Long.MAX_VALUE;
- long maxQueryEndTime = Long.MIN_VALUE;
- this.fillTypes = groupByTimeFillPlan.getFillType();
- for (Map.Entry<TSDataType, IFill> IFillEntry : fillTypes.entrySet()) {
- IFill fill = IFillEntry.getValue();
- if (fill instanceof PreviousFill) {
- fill.convertRange(startTime, endTime);
- minQueryStartTime = Math.min(minQueryStartTime,
fill.getQueryStartTime());
- } else if (fill instanceof LinearFill) {
- fill.convertRange(startTime, endTime);
- minQueryStartTime = Math.min(minQueryStartTime,
fill.getQueryStartTime());
- maxQueryEndTime = Math.max(maxQueryEndTime, fill.getQueryEndTime());
- }
- }
-
- if (minQueryStartTime < Long.MAX_VALUE) {
- extraPreviousExecutors = new HashMap<>();
-
- long queryRange = minQueryStartTime - startTime;
- long extraStartTime, intervalNum;
- if (isSlidingStepByMonth) {
- intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep *
MS_TO_MONTH));
- extraStartTime = calcIntervalByMonth(startTime, intervalNum *
slidingStep);
- while (extraStartTime < minQueryStartTime) {
- intervalNum += 1;
- extraStartTime = calcIntervalByMonth(startTime, intervalNum *
slidingStep);
- }
- } else {
- intervalNum = (long) Math.ceil(queryRange / (double) slidingStep);
- extraStartTime = slidingStep * intervalNum + startTime;
- }
-
- Filter timeFilter = new GroupByFilter(interval, slidingStep,
extraStartTime, startTime);
- getGroupByExecutors(extraPreviousExecutors, context,
groupByTimeFillPlan, timeFilter, false);
- }
-
- if (maxQueryEndTime > Long.MIN_VALUE) {
- extraNextExecutors = new HashMap<>();
- Pair<Long, Long> lastTimeRange = getLastTimeRange();
- lastTimeRange = getNextTimeRange(lastTimeRange.left, true, false);
- Filter timeFilter =
- new GroupByFilter(interval, slidingStep, lastTimeRange.left,
maxQueryEndTime);
- getGroupByExecutors(extraNextExecutors, context, groupByTimeFillPlan,
timeFilter, true);
}
}
/* check if specified path has next extra range */
- private boolean pathHasExtra(int pathId, boolean isExtraPrevious, long
extraStartTime) {
+ protected boolean pathHasExtra(int pathId, boolean isExtraPrevious, long
extraStartTime) {
List<Integer> Indexes = resultIndexes.get(deduplicatedPaths.get(pathId));
for (int resultIndex : Indexes) {
if (isExtraPrevious && extraPreviousValues[resultIndex] != null) {
@@ -294,48 +195,17 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
return false;
}
- private void initExtraArrays(
- Object[] extraValues,
- long[] extraTimes,
- boolean isExtraPrevious,
- Map<PartialPath, GroupByExecutor> extraExecutors)
- throws QueryProcessException {
- for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) {
- GroupByExecutor executor =
extraExecutors.get(deduplicatedPaths.get(pathId));
- List<Integer> Indexes = resultIndexes.get(deduplicatedPaths.get(pathId));
-
- Pair<Long, Long> extraTimeRange;
- if (isExtraPrevious) {
- extraTimeRange = getFirstTimeRange();
- } else {
- extraTimeRange = getLastTimeRange();
- }
-
- extraTimeRange = getNextTimeRange(extraTimeRange.left, !isExtraPrevious,
false);
- try {
- while (pathHasExtra(pathId, isExtraPrevious, extraTimeRange.left)) {
- List<AggregateResult> aggregations =
- executor.calcResult(extraTimeRange.left, extraTimeRange.right);
- if (!resultIsNull(aggregations)) {
- // we check extra time range in single path together,
- // thus the extra result will be cached together
- for (int i = 0; i < aggregations.size(); i++) {
- if (extraValues[Indexes.get(i)] == null) {
- extraValues[Indexes.get(i)] = aggregations.get(i).getResult();
- extraTimes[Indexes.get(i)] = extraTimeRange.left;
- }
- }
- }
-
- extraTimeRange = getNextTimeRange(extraTimeRange.left,
!isExtraPrevious, false);
- }
- } catch (IOException e) {
- throw new QueryProcessException(e.getMessage());
- }
+ /* If result is null or CountAggrResult is 0, then result is NULL */
+ protected boolean resultIsNull(List<AggregateResult> aggregateResults) {
+ AggregateResult result = aggregateResults.get(0);
+ if (result.getResult() == null) {
+ return true;
+ } else {
+ return result instanceof CountAggrResult && (long) result.getResult() ==
0;
}
}
- private boolean pathHasNext(int pathId) {
+ protected boolean pathHasNext(int pathId) {
// has cached
if (hasCachedQueryInterval[pathId]) {
return true;
@@ -353,105 +223,23 @@ public class GroupByFillWithoutValueFilterDataSet
extends GroupByWithoutValueFil
return true;
}
- /* If result is null or CountAggrResult is 0, then result is NULL */
- private boolean resultIsNull(List<AggregateResult> aggregateResults) {
- AggregateResult result = aggregateResults.get(0);
- if (result.getResult() == null) {
- return true;
- } else {
- return result instanceof CountAggrResult && (long) result.getResult() ==
0;
- }
- }
-
- private void pathGetNext(int pathId) throws IOException {
- GroupByExecutor executor =
pathExecutors.get(deduplicatedPaths.get(pathId));
- List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
-
- // Slide value and time
- pathSlideNext(pathId);
-
- List<AggregateResult> aggregations;
- try {
- // get second not null aggregate results
- aggregations = executor.calcResult(queryStartTimes[pathId],
queryEndTimes[pathId]);
- hasCachedQueryInterval[pathId] = false;
- while (resultIsNull(aggregations) && pathHasNext(pathId)) {
- aggregations = executor.calcResult(queryStartTimes[pathId],
queryEndTimes[pathId]);
- hasCachedQueryInterval[pathId] = false;
- }
- } catch (QueryProcessException e) {
- logger.error("GroupByFillWithoutValueFilterDataSet execute has error: ",
e);
- throw new IOException(e.getMessage(), e);
- }
-
- if (resultIsNull(aggregations)) {
- pathSlide(pathId);
- } else {
- for (int i = 0; i < aggregations.size(); i++) {
- int Index = resultIndex.get(i);
- if (ascending) {
- nextValues[Index] = aggregations.get(i).getResult();
- nextTimes[Index] = queryStartTimes[pathId];
- } else {
- previousValues[Index] = aggregations.get(i).getResult();
- previousTimes[Index] = queryStartTimes[pathId];
- }
- }
- }
-
- hasCachedQueryInterval[pathId] = false;
- }
-
- private void pathSlideNext(int pathId) {
- List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
- if (ascending) {
- for (int resultId : resultIndex) {
- previousValues[resultId] = nextValues[resultId];
- previousTimes[resultId] = nextTimes[resultId];
- nextValues[resultId] = null;
- nextTimes[resultId] = Long.MAX_VALUE;
- }
- } else {
- for (int resultId : resultIndex) {
- nextValues[resultId] = previousValues[resultId];
- nextTimes[resultId] = previousTimes[resultId];
- previousValues[resultId] = null;
- previousTimes[resultId] = Long.MIN_VALUE;
- }
- }
- }
-
- private void pathSlideExtra(int pathId) {
- List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
- if (ascending) {
- for (int Index : resultIndex) {
- nextValues[Index] = extraNextValues[Index];
- nextTimes[Index] = extraNextTimes[Index];
- }
- } else {
- for (int Index : resultIndex) {
- previousValues[Index] = extraPreviousValues[Index];
- previousTimes[Index] = extraPreviousTimes[Index];
- }
- }
- }
+ protected abstract void pathGetNext(int pathId) throws IOException;
- private void pathSlide(int pathId) throws IOException {
+ protected void pathSlide(int pathId) throws IOException {
if (pathHasNext(pathId)) {
pathGetNext(pathId);
} else {
- pathSlideExtra(pathId);
- }
- }
-
- /* Cache the previous and next query data before group by fill query */
- private void initCachedTimesAndValues() throws QueryProcessException {
- for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) {
- try {
- pathSlide(pathId);
- pathSlide(pathId);
- } catch (IOException e) {
- throw new QueryProcessException(e.getMessage());
+ List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
+ if (ascending) {
+ for (int Index : resultIndex) {
+ nextValues[Index] = extraNextValues[Index];
+ nextTimes[Index] = extraNextTimes[Index];
+ }
+ } else {
+ for (int Index : resultIndex) {
+ previousValues[Index] = extraPreviousValues[Index];
+ previousTimes[Index] = extraPreviousTimes[Index];
+ }
}
}
}
@@ -577,4 +365,48 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
}
return record;
}
+
+ // Slide cached value and time
+ protected void cacheSlideNext(List<Integer> cacheIndexes) {
+ if (ascending) {
+ for (int cacheId : cacheIndexes) {
+ previousValues[cacheId] = nextValues[cacheId];
+ previousTimes[cacheId] = nextTimes[cacheId];
+ nextValues[cacheId] = null;
+ nextTimes[cacheId] = Long.MAX_VALUE;
+ }
+ } else {
+ for (int cacheId : cacheIndexes) {
+ nextValues[cacheId] = previousValues[cacheId];
+ nextTimes[cacheId] = previousTimes[cacheId];
+ previousValues[cacheId] = null;
+ previousTimes[cacheId] = Long.MIN_VALUE;
+ }
+ }
+ }
+
+ // Fill cached value and time
+ protected void cacheFillNext(
+ int pathId, List<AggregateResult> aggregateResults, List<Integer>
cacheIndexes)
+ throws IOException {
+ if (resultIsNull(aggregateResults)) {
+ pathSlide(pathId);
+ } else {
+ for (int i = 0; i < aggregateResults.size(); i++) {
+ int Index = cacheIndexes.get(i);
+ if (ascending) {
+ nextValues[Index] = aggregateResults.get(i).getResult();
+ nextTimes[Index] = queryStartTimes[pathId];
+ } else {
+ previousValues[Index] = aggregateResults.get(i).getResult();
+ previousTimes[Index] = queryStartTimes[pathId];
+ }
+ }
+ }
+ }
+
+ @Override
+ public Pair<Long, Object> peekNextNotNullValue(Path path, int i) throws
IOException {
+ throw new IOException("Group by fill doesn't support peek next");
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithValueFilterDataSet.java
new file mode 100644
index 0000000..4ed61cc
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithValueFilterDataSet.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.executor.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.LinearFill;
+import org.apache.iotdb.db.query.executor.fill.PreviousFill;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.BinaryFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class GroupByFillWithValueFilterDataSet extends
GroupByFillEngineDataSet {
+
+ // the next query time range of each path
+ private List<TimeGenerator> timestampGenerators;
+ private List<TimeGenerator> extraPreviousGenerators;
+ private List<TimeGenerator> extraNextGenerators;
+
+ // data reader lists
+ private List<IReaderByTimestamp> allDataReaderList;
+ private List<IReaderByTimestamp> extraPreviousDataReaderList;
+ private List<IReaderByTimestamp> extraNextDataReaderList;
+
+ // cached timestamp for next group by partition
+ private long lastTimestamp;
+ private List<LinkedList<Long>> cachedTimestamps;
+
+ private final int timeStampFetchSize =
IoTDBDescriptor.getInstance().getConfig().getBatchSize();
+
+ /** constructor. */
+ public GroupByFillWithValueFilterDataSet(
+ QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan) {
+ super(context, groupByTimeFillPlan);
+ }
+
+ public void init(QueryContext context, GroupByTimeFillPlan
groupByTimeFillPlan)
+ throws QueryProcessException, StorageEngineException {
+ initReadersByTimestamp(context, groupByTimeFillPlan);
+
+ initExtraReadersByTimestamp(context, groupByTimeFillPlan);
+ if (extraPreviousGenerators != null) {
+ initExtraArrays(extraPreviousValues, extraPreviousTimes, true,
extraPreviousGenerators);
+ }
+ if (extraNextGenerators != null) {
+ initExtraArrays(extraNextValues, extraNextTimes, false,
extraNextGenerators);
+ }
+
+ initCachedTimesAndValues();
+ }
+
+ private void initReadersByTimestamp(QueryContext context,
GroupByTimeFillPlan groupByTimeFillPlan)
+ throws QueryProcessException, StorageEngineException {
+ this.timestampGenerators = new ArrayList<>();
+ this.cachedTimestamps = new ArrayList<>();
+ for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ timestampGenerators.add(getTimeGenerator(context, groupByTimeFillPlan));
+ cachedTimestamps.add(new LinkedList<>());
+ }
+
+ this.allDataReaderList = new ArrayList<>();
+ List<StorageGroupProcessor> list =
+ StorageEngine.getInstance()
+ .mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
+ try {
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = (PartialPath) paths.get(i);
+ allDataReaderList.add(
+ getReaderByTime(path, groupByTimeFillPlan, dataTypes.get(i),
context));
+ }
+ } finally {
+ StorageEngine.getInstance().mergeUnLock(list);
+ }
+ }
+
+ protected TimeGenerator getTimeGenerator(QueryContext context,
RawDataQueryPlan queryPlan)
+ throws StorageEngineException {
+ return new ServerTimeGenerator(context, queryPlan);
+ }
+
+ protected IReaderByTimestamp getReaderByTime(
+ PartialPath path, RawDataQueryPlan queryPlan, TSDataType dataType,
QueryContext context)
+ throws StorageEngineException, QueryProcessException {
+ return new SeriesReaderByTimestamp(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ context,
+ QueryResourceManager.getInstance().getQueryDataSource(path, context,
null),
+ null,
+ ascending);
+ }
+
+ private void replaceGroupByFilter(IExpression expression, Filter timeFilter)
+ throws QueryProcessException, IllegalPathException {
+ if (expression instanceof SingleSeriesExpression) {
+ if (((SingleSeriesExpression) expression).getFilter() instanceof
GroupByFilter) {
+ ((SingleSeriesExpression) expression).setFilter(timeFilter);
+ } else if (((SingleSeriesExpression) expression).getFilter() instanceof
BinaryFilter) {
+ if (((BinaryFilter) ((SingleSeriesExpression)
expression).getFilter()).getLeft()
+ instanceof GroupByFilter) {
+ ((BinaryFilter) ((SingleSeriesExpression)
expression).getFilter()).setLeft(timeFilter);
+ } else if (((BinaryFilter) ((SingleSeriesExpression)
expression).getFilter()).getRight()
+ instanceof GroupByFilter) {
+ ((BinaryFilter) ((SingleSeriesExpression)
expression).getFilter()).setRight(timeFilter);
+ }
+ } else {
+ throw new QueryProcessException("unknown filter type, can't replace
group by filter");
+ }
+ } else if (expression instanceof BinaryExpression) {
+ replaceGroupByFilter(((BinaryExpression) expression).getLeft(),
timeFilter);
+ replaceGroupByFilter(((BinaryExpression) expression).getRight(),
timeFilter);
+ } else {
+ throw new QueryProcessException("unknown expression type, can't replace
group by filter");
+ }
+ }
+
+ // get new expression that can query extra range
+ private IExpression getNewExpression(GroupByTimeFillPlan
groupByTimeFillPlan, Filter timeFilter)
+ throws QueryProcessException {
+ IExpression newExpression = groupByTimeFillPlan.getExpression().clone();
+ try {
+ replaceGroupByFilter(newExpression, timeFilter);
+ } catch (IllegalPathException ignore) {
+ // ignored
+ }
+ return newExpression;
+ }
+
+ /* Init extra path executors to query data outside the original group by
query */
+ private void initExtraReadersByTimestamp(
+ QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan)
+ throws StorageEngineException, QueryProcessException {
+ long minQueryStartTime = Long.MAX_VALUE;
+ long maxQueryEndTime = Long.MIN_VALUE;
+ for (Map.Entry<TSDataType, IFill> IFillEntry : fillTypes.entrySet()) {
+ IFill fill = IFillEntry.getValue();
+ if (fill instanceof PreviousFill) {
+ fill.convertRange(startTime, endTime);
+ minQueryStartTime = Math.min(minQueryStartTime,
fill.getQueryStartTime());
+ } else if (fill instanceof LinearFill) {
+ fill.convertRange(startTime, endTime);
+ minQueryStartTime = Math.min(minQueryStartTime,
fill.getQueryStartTime());
+ maxQueryEndTime = Math.max(maxQueryEndTime, fill.getQueryEndTime());
+ }
+ }
+
+ if (minQueryStartTime < Long.MAX_VALUE) {
+ extraPreviousGenerators = new ArrayList<>();
+
+ long queryRange = minQueryStartTime - startTime;
+ long extraStartTime, intervalNum;
+ if (isSlidingStepByMonth) {
+ intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep *
MS_TO_MONTH));
+ extraStartTime = calcIntervalByMonth(startTime, intervalNum *
slidingStep);
+ while (extraStartTime < minQueryStartTime) {
+ intervalNum += 1;
+ extraStartTime = calcIntervalByMonth(startTime, intervalNum *
slidingStep);
+ }
+ } else {
+ intervalNum = (long) Math.ceil(queryRange / (double) slidingStep);
+ extraStartTime = slidingStep * intervalNum + startTime;
+ }
+
+ Filter timeFilter = new GroupByFilter(interval, slidingStep,
extraStartTime, startTime);
+ IExpression newExpression = getNewExpression(groupByTimeFillPlan,
timeFilter);
+ groupByTimeFillPlan.setExpression(newExpression);
+ for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ extraPreviousGenerators.add(getTimeGenerator(context,
groupByTimeFillPlan));
+ }
+ }
+
+ if (maxQueryEndTime > Long.MIN_VALUE) {
+ extraNextGenerators = new ArrayList<>();
+ Pair<Long, Long> lastTimeRange = getLastTimeRange();
+ lastTimeRange = getNextTimeRange(lastTimeRange.left, true, false);
+
+ Filter timeFilter =
+ new GroupByFilter(interval, slidingStep, lastTimeRange.left,
maxQueryEndTime);
+ IExpression newExpression = getNewExpression(groupByTimeFillPlan,
timeFilter);
+ groupByTimeFillPlan.setExpression(newExpression);
+ for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ extraNextGenerators.add(getTimeGenerator(context,
groupByTimeFillPlan));
+ }
+ }
+
+ extraPreviousDataReaderList = new ArrayList<>();
+ extraNextDataReaderList = new ArrayList<>();
+ List<StorageGroupProcessor> list =
+ StorageEngine.getInstance()
+ .mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
+ try {
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = (PartialPath) paths.get(i);
+ extraPreviousDataReaderList.add(
+ getReaderByTime(path, groupByTimeFillPlan, dataTypes.get(i),
context));
+ extraNextDataReaderList.add(
+ getReaderByTime(path, groupByTimeFillPlan, dataTypes.get(i),
context));
+ }
+ } finally {
+ StorageEngine.getInstance().mergeUnLock(list);
+ }
+ }
+
+ private void initExtraArrays(
+ Object[] extraValues,
+ long[] extraTimes,
+ boolean isExtraPrevious,
+ List<TimeGenerator> extraGenerators)
+ throws QueryProcessException {
+ for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) {
+ List<Integer> Indexes = resultIndexes.get(deduplicatedPaths.get(pathId));
+
+ Pair<Long, Long> extraTimeRange;
+ if (isExtraPrevious) {
+ extraTimeRange = getFirstTimeRange();
+ } else {
+ extraTimeRange = getLastTimeRange();
+ }
+
+ LinkedList<Long> cachedTimestamp = new LinkedList<>();
+ extraTimeRange = getNextTimeRange(extraTimeRange.left, !isExtraPrevious,
false);
+ try {
+ while (pathHasExtra(pathId, isExtraPrevious, extraTimeRange.left)) {
+ List<AggregateResult> aggregateResults =
+ calcResult(
+ pathId,
+ extraTimeRange,
+ extraGenerators.get(pathId),
+ cachedTimestamp,
+ isExtraPrevious ? extraPreviousDataReaderList :
extraNextDataReaderList,
+ !isExtraPrevious);
+ if (!resultIsNull(aggregateResults)) {
+ // we check extra time range in single path together,
+ // thus the extra result will be cached together
+ for (int i = 0; i < aggregateResults.size(); i++) {
+ if (extraValues[Indexes.get(i)] == null) {
+ extraValues[Indexes.get(i)] =
aggregateResults.get(i).getResult();
+ extraTimes[Indexes.get(i)] = extraTimeRange.left;
+ }
+ }
+ }
+
+ extraTimeRange = getNextTimeRange(extraTimeRange.left,
!isExtraPrevious, false);
+ }
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
+ }
+
+ private int constructTimeArrayForOneCal(
+ long[] timestampArray,
+ int timeArrayLength,
+ Pair<Long, Long> timeRange,
+ boolean isAscending,
+ TimeGenerator timeGenerator,
+ LinkedList<Long> cachedTimestamp)
+ throws IOException {
+
+ for (int cnt = 1;
+ cnt < timeStampFetchSize - 1 && (!cachedTimestamp.isEmpty() ||
timeGenerator.hasNext());
+ cnt++) {
+ if (!cachedTimestamp.isEmpty()) {
+ lastTimestamp = cachedTimestamp.remove();
+ } else {
+ lastTimestamp = timeGenerator.next();
+ }
+ if (isAscending && lastTimestamp < timeRange.right) {
+ timestampArray[timeArrayLength++] = lastTimestamp;
+ } else if (!isAscending && lastTimestamp >= timeRange.left) {
+ timestampArray[timeArrayLength++] = lastTimestamp;
+ } else {
+ // may lastTimestamp get from cache
+ if (!cachedTimestamp.isEmpty() && lastTimestamp <=
cachedTimestamp.peek()) {
+ cachedTimestamp.addFirst(lastTimestamp);
+ } else {
+ cachedTimestamp.add(lastTimestamp);
+ }
+ break;
+ }
+ }
+ return timeArrayLength;
+ }
+
+ private List<AggregateResult> calcResult(
+ int pathId,
+ Pair<Long, Long> timeRange,
+ TimeGenerator timeGenerator,
+ LinkedList<Long> cachedTimestamp,
+ List<IReaderByTimestamp> dataReaderList,
+ boolean isAscending)
+ throws IOException {
+ List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
+ List<AggregateResult> aggregateResults = new ArrayList<>();
+
+ for (int index : resultIndex) {
+ aggregateResults.add(
+ AggregateResultFactory.getAggrResultByName(
+ aggregations.get(index), dataTypes.get(index), ascending));
+ }
+
+ long[] timestampArray = new long[timeStampFetchSize];
+ int timeArrayLength = 0;
+
+ if (!cachedTimestamp.isEmpty()) {
+ long timestamp = cachedTimestamp.remove();
+ if (timestamp < timeRange.right) {
+ if (!isAscending && timestamp < timeRange.left) {
+ cachedTimestamp.addFirst(timestamp);
+ return aggregateResults;
+ }
+ if (timestamp >= timeRange.left) {
+ timestampArray[timeArrayLength++] = timestamp;
+ }
+ } else {
+ cachedTimestamp.addFirst(timestamp);
+ return aggregateResults;
+ }
+ }
+
+ while (!cachedTimestamp.isEmpty() || timeGenerator.hasNext()) {
+ // construct timestamp array
+ timeArrayLength =
+ constructTimeArrayForOneCal(
+ timestampArray,
+ timeArrayLength,
+ timeRange,
+ isAscending,
+ timeGenerator,
+ cachedTimestamp);
+
+ // cal result using timestamp array
+ for (int i = 0; i < resultIndex.size(); i++) {
+ aggregateResults
+ .get(i)
+ .updateResultUsingTimestamps(
+ timestampArray, timeArrayLength,
dataReaderList.get(resultIndex.get(i)));
+ }
+
+ timeArrayLength = 0;
+ // judge if it's end
+ if ((isAscending && lastTimestamp >= timeRange.right)
+ || (!isAscending && lastTimestamp < timeRange.left)) {
+ break;
+ }
+ }
+
+ if (timeArrayLength > 0) {
+ // cal result using timestamp array
+ for (int i = 0; i < resultIndex.size(); i++) {
+ aggregateResults
+ .get(i)
+ .updateResultUsingTimestamps(
+ timestampArray, timeArrayLength,
dataReaderList.get(resultIndex.get(i)));
+ }
+ }
+ return aggregateResults;
+ }
+
+ @Override
+ protected void pathGetNext(int pathId) throws IOException {
+ List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
+
+ cacheSlideNext(resultIndex);
+
+ // get second not null aggregate results
+ List<AggregateResult> aggregateResults;
+ aggregateResults =
+ calcResult(
+ pathId,
+ new Pair<>(queryStartTimes[pathId], queryEndTimes[pathId]),
+ timestampGenerators.get(pathId),
+ cachedTimestamps.get(pathId),
+ allDataReaderList,
+ ascending);
+ hasCachedQueryInterval[pathId] = false;
+ while (resultIsNull(aggregateResults) && pathHasNext(pathId)) {
+ aggregateResults =
+ calcResult(
+ pathId,
+ new Pair<>(queryStartTimes[pathId], queryEndTimes[pathId]),
+ timestampGenerators.get(pathId),
+ cachedTimestamps.get(pathId),
+ allDataReaderList,
+ ascending);
+ hasCachedQueryInterval[pathId] = false;
+ }
+
+ cacheFillNext(pathId, aggregateResults, resultIndex);
+
+ hasCachedQueryInterval[pathId] = false;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
index 8a7705e..8cd15c0 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java
@@ -22,97 +22,50 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
-import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.IFill;
import org.apache.iotdb.db.query.executor.fill.LinearFill;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
-import org.apache.iotdb.db.query.executor.fill.ValueFill;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
-public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFilterDataSet {
+public class GroupByFillWithoutValueFilterDataSet extends
GroupByFillEngineDataSet {
private static final Logger logger =
LoggerFactory.getLogger(GroupByFillWithoutValueFilterDataSet.class);
- private Map<TSDataType, IFill> fillTypes;
- private final List<PartialPath> deduplicatedPaths;
- private final List<String> aggregations;
+ // path executors
+ private final Map<PartialPath, GroupByExecutor> pathExecutors = new
HashMap<>();
private Map<PartialPath, GroupByExecutor> extraPreviousExecutors = null;
private Map<PartialPath, GroupByExecutor> extraNextExecutors = null;
- // the extra previous means first not null value before startTime
- // used to fill result before the first not null data
- private Object[] extraPreviousValues;
- private long[] extraPreviousTimes;
-
- // the previous value for each time series, which means
- // first not null value GEQ curStartTime in order asc
- // second not null value GEQ curStartTime in order desc
- private Object[] previousValues;
- private long[] previousTimes;
-
- // the extra next means first not null value after endTime
- // used to fill result after the last not null data
- private Object[] extraNextValues;
- private long[] extraNextTimes;
-
- // the next value for each time series, which means
- // first not null value LEQ curStartTime in order desc
- // second not null value LEQ curStartTime in order asc
- private Object[] nextValues;
- private long[] nextTimes;
-
- // the result datatype for each time series
- private TSDataType[] resultDataType;
-
- // the next query time range of each path
- private long[] queryStartTimes;
- private long[] queryEndTimes;
- private boolean[] hasCachedQueryInterval;
-
public GroupByFillWithoutValueFilterDataSet(
- QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan)
- throws QueryProcessException, StorageEngineException {
+ QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan) {
super(context, groupByTimeFillPlan);
- this.aggregations = groupByTimeFillPlan.getDeduplicatedAggregations();
- this.deduplicatedPaths = new ArrayList<>();
- for (Path path : paths) {
- PartialPath partialPath = (PartialPath) path;
- if (!deduplicatedPaths.contains(partialPath)) {
- deduplicatedPaths.add(partialPath);
- }
- }
}
public void init(QueryContext context, GroupByTimeFillPlan
groupByTimeFillPlan)
throws QueryProcessException, StorageEngineException {
- initGroupBy(context, groupByTimeFillPlan);
- initArrays();
+ initPathExecutors(context, groupByTimeFillPlan);
+
initExtraExecutors(context, groupByTimeFillPlan);
if (extraPreviousExecutors != null) {
initExtraArrays(extraPreviousValues, extraPreviousTimes, true,
extraPreviousExecutors);
@@ -120,56 +73,33 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
if (extraNextExecutors != null) {
initExtraArrays(extraNextValues, extraNextTimes, false,
extraNextExecutors);
}
+
initCachedTimesAndValues();
}
- private void initArrays() {
- extraPreviousValues = new Object[aggregations.size()];
- extraPreviousTimes = new long[aggregations.size()];
- previousValues = new Object[aggregations.size()];
- previousTimes = new long[aggregations.size()];
- extraNextValues = new Object[aggregations.size()];
- extraNextTimes = new long[aggregations.size()];
- nextValues = new Object[aggregations.size()];
- nextTimes = new long[aggregations.size()];
- Arrays.fill(extraPreviousValues, null);
- Arrays.fill(extraPreviousTimes, Long.MIN_VALUE);
- Arrays.fill(previousValues, null);
- Arrays.fill(previousTimes, Long.MIN_VALUE);
- Arrays.fill(extraNextValues, null);
- Arrays.fill(extraNextTimes, Long.MAX_VALUE);
- Arrays.fill(nextValues, null);
- Arrays.fill(nextTimes, Long.MAX_VALUE);
-
- queryStartTimes = new long[paths.size()];
- queryEndTimes = new long[paths.size()];
- hasCachedQueryInterval = new boolean[paths.size()];
- resultDataType = new TSDataType[aggregations.size()];
- Arrays.fill(queryStartTimes, curStartTime);
- Arrays.fill(queryEndTimes, curEndTime);
- Arrays.fill(hasCachedQueryInterval, true);
- for (PartialPath deduplicatedPath : deduplicatedPaths) {
- List<Integer> indexes = resultIndexes.get(deduplicatedPath);
- for (int index : indexes) {
- switch (aggregations.get(index)) {
- case "avg":
- case "sum":
- resultDataType[index] = TSDataType.DOUBLE;
- break;
- case "count":
- case "max_time":
- case "min_time":
- resultDataType[index] = TSDataType.INT64;
- break;
- case "first_value":
- case "last_value":
- case "max_value":
- case "min_value":
- resultDataType[index] = dataTypes.get(index);
- break;
- }
- }
+ private void initPathExecutors(QueryContext context, GroupByTimeFillPlan
groupByTimeFillPlan)
+ throws QueryProcessException, StorageEngineException {
+ IExpression expression = groupByTimeFillPlan.getExpression();
+ Filter timeFilter = null;
+ if (expression != null) {
+ timeFilter = ((GlobalTimeExpression) expression).getFilter();
+ }
+ if (timeFilter == null) {
+ throw new QueryProcessException("TimeFilter cannot be null in GroupBy
query.");
}
+ getGroupByExecutors(pathExecutors, context, groupByTimeFillPlan,
timeFilter, ascending);
+ }
+
+ protected GroupByExecutor getGroupByExecutor(
+ PartialPath path,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ Filter timeFilter,
+ boolean ascending)
+ throws StorageEngineException, QueryProcessException {
+ return new LocalGroupByExecutor(
+ path, allSensors, dataType, context, timeFilter, null, ascending);
}
private void getGroupByExecutors(
@@ -196,15 +126,14 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
dataTypes.get(i),
context,
timeFilter.copy(),
- null,
isAscending));
}
- AggregateResult aggrResult =
+ AggregateResult aggregateResult =
AggregateResultFactory.getAggrResultByName(
groupByTimeFillPlan.getDeduplicatedAggregations().get(i),
dataTypes.get(i),
ascending);
- extraExecutors.get(path).addAggregateResult(aggrResult);
+ extraExecutors.get(path).addAggregateResult(aggregateResult);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -260,40 +189,6 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
}
}
- /* check if specified path has next extra range */
- private boolean pathHasExtra(int pathId, boolean isExtraPrevious, long
extraStartTime) {
- List<Integer> Indexes = resultIndexes.get(deduplicatedPaths.get(pathId));
- for (int resultIndex : Indexes) {
- if (isExtraPrevious && extraPreviousValues[resultIndex] != null) {
- continue;
- } else if (!isExtraPrevious && extraNextValues[resultIndex] != null) {
- continue;
- }
-
- IFill fill = fillTypes.get(resultDataType[resultIndex]);
- if (fill == null) {
- continue;
- }
- if (fill instanceof PreviousFill && isExtraPrevious) {
- if (fill.getQueryStartTime() <= extraStartTime) {
- return true;
- }
- } else if (fill instanceof LinearFill) {
- if (isExtraPrevious) {
- if (fill.getQueryStartTime() <= extraStartTime) {
- return true;
- }
- } else {
- if (extraStartTime < fill.getQueryEndTime()) {
- return true;
- }
- }
- }
- }
-
- return false;
- }
-
private void initExtraArrays(
Object[] extraValues,
long[] extraTimes,
@@ -335,48 +230,20 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
}
}
- private boolean pathHasNext(int pathId) {
- // has cached
- if (hasCachedQueryInterval[pathId]) {
- return true;
- }
-
- // find the next aggregation interval
- Pair<Long, Long> nextTimeRange = getNextTimeRange(queryStartTimes[pathId],
ascending, true);
- if (nextTimeRange == null) {
- return false;
- }
- queryStartTimes[pathId] = nextTimeRange.left;
- queryEndTimes[pathId] = nextTimeRange.right;
-
- hasCachedQueryInterval[pathId] = true;
- return true;
- }
-
- /* If result is null or CountAggrResult is 0, then result is NULL */
- private boolean resultIsNull(List<AggregateResult> aggregateResults) {
- AggregateResult result = aggregateResults.get(0);
- if (result.getResult() == null) {
- return true;
- } else {
- return result instanceof CountAggrResult && (long) result.getResult() ==
0;
- }
- }
-
- private void pathGetNext(int pathId) throws IOException {
+ @Override
+ protected void pathGetNext(int pathId) throws IOException {
GroupByExecutor executor =
pathExecutors.get(deduplicatedPaths.get(pathId));
List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
- // Slide value and time
- pathSlideNext(pathId);
+ cacheSlideNext(resultIndex);
- List<AggregateResult> aggregations;
+ List<AggregateResult> aggregateResults;
try {
// get second not null aggregate results
- aggregations = executor.calcResult(queryStartTimes[pathId],
queryEndTimes[pathId]);
+ aggregateResults = executor.calcResult(queryStartTimes[pathId],
queryEndTimes[pathId]);
hasCachedQueryInterval[pathId] = false;
- while (resultIsNull(aggregations) && pathHasNext(pathId)) {
- aggregations = executor.calcResult(queryStartTimes[pathId],
queryEndTimes[pathId]);
+ while (resultIsNull(aggregateResults) && pathHasNext(pathId)) {
+ aggregateResults = executor.calcResult(queryStartTimes[pathId],
queryEndTimes[pathId]);
hasCachedQueryInterval[pathId] = false;
}
} catch (QueryProcessException e) {
@@ -384,197 +251,8 @@ public class GroupByFillWithoutValueFilterDataSet extends
GroupByWithoutValueFil
throw new IOException(e.getMessage(), e);
}
- if (resultIsNull(aggregations)) {
- pathSlide(pathId);
- } else {
- for (int i = 0; i < aggregations.size(); i++) {
- int Index = resultIndex.get(i);
- if (ascending) {
- nextValues[Index] = aggregations.get(i).getResult();
- nextTimes[Index] = queryStartTimes[pathId];
- } else {
- previousValues[Index] = aggregations.get(i).getResult();
- previousTimes[Index] = queryStartTimes[pathId];
- }
- }
- }
+ cacheFillNext(pathId, aggregateResults, resultIndex);
hasCachedQueryInterval[pathId] = false;
}
-
- private void pathSlideNext(int pathId) {
- List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
- if (ascending) {
- for (int resultId : resultIndex) {
- previousValues[resultId] = nextValues[resultId];
- previousTimes[resultId] = nextTimes[resultId];
- nextValues[resultId] = null;
- nextTimes[resultId] = Long.MAX_VALUE;
- }
- } else {
- for (int resultId : resultIndex) {
- nextValues[resultId] = previousValues[resultId];
- nextTimes[resultId] = previousTimes[resultId];
- previousValues[resultId] = null;
- previousTimes[resultId] = Long.MIN_VALUE;
- }
- }
- }
-
- private void pathSlideExtra(int pathId) {
- List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
- if (ascending) {
- for (int Index : resultIndex) {
- nextValues[Index] = extraNextValues[Index];
- nextTimes[Index] = extraNextTimes[Index];
- }
- } else {
- for (int Index : resultIndex) {
- previousValues[Index] = extraPreviousValues[Index];
- previousTimes[Index] = extraPreviousTimes[Index];
- }
- }
- }
-
- private void pathSlide(int pathId) throws IOException {
- if (pathHasNext(pathId)) {
- pathGetNext(pathId);
- } else {
- pathSlideExtra(pathId);
- }
- }
-
- /* Cache the previous and next query data before group by fill query */
- private void initCachedTimesAndValues() throws QueryProcessException {
- for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) {
- try {
- pathSlide(pathId);
- pathSlide(pathId);
- } catch (IOException e) {
- throw new QueryProcessException(e.getMessage());
- }
- }
- }
-
- private void fillRecord(
- int resultId, RowRecord record, Pair<Long, Object> beforePair,
Pair<Long, Object> afterPair)
- throws IOException {
- // Don't fill count aggregation
- if (Objects.equals(aggregations.get(resultId), "count")) {
- record.addField((long) 0, TSDataType.INT64);
- return;
- }
-
- IFill fill = fillTypes.get(resultDataType[resultId]);
- if (fill == null) {
- record.addField(null);
- return;
- }
-
- if (fill instanceof PreviousFill) {
- if (beforePair.right != null
- && (fill.getBeforeRange() == -1
- || fill.insideBeforeRange(beforePair.left,
record.getTimestamp()))
- && ((!((PreviousFill) fill).isUntilLast())
- || (afterPair.right != null && afterPair.left < endTime))) {
- record.addField(beforePair.right, resultDataType[resultId]);
- } else {
- record.addField(null);
- }
- } else if (fill instanceof LinearFill) {
- LinearFill linearFill = new LinearFill();
- if (beforePair.right != null
- && afterPair.right != null
- && (fill.getBeforeRange() == -1
- || fill.insideBeforeRange(beforePair.left,
record.getTimestamp()))
- && (fill.getAfterRange() == -1
- || fill.insideAfterRange(afterPair.left,
record.getTimestamp()))) {
- try {
- TimeValuePair filledPair =
- linearFill.averageWithTimeAndDataType(
- new TimeValuePair(
- beforePair.left,
- TsPrimitiveType.getByType(resultDataType[resultId],
beforePair.right)),
- new TimeValuePair(
- afterPair.left,
- TsPrimitiveType.getByType(resultDataType[resultId],
afterPair.right)),
- curStartTime,
- resultDataType[resultId]);
- record.addField(filledPair.getValue().getValue(),
resultDataType[resultId]);
- } catch (UnSupportedFillTypeException e) {
- record.addField(null);
- throw new IOException(e);
- }
- } else {
- record.addField(null);
- }
- } else if (fill instanceof ValueFill) {
- try {
- TimeValuePair filledPair = fill.getFillResult();
- record.addField(filledPair.getValue().getValue(),
resultDataType[resultId]);
- } catch (QueryProcessException | StorageEngineException e) {
- throw new IOException(e);
- }
- }
- }
-
- @Override
- public RowRecord nextWithoutConstraint() throws IOException {
- if (!hasCachedTimeInterval) {
- throw new IOException(
- "need to call hasNext() before calling next() "
- + "in GroupByFillWithoutValueFilterDataSet.");
- }
- hasCachedTimeInterval = false;
- RowRecord record = new RowRecord(curStartTime);
-
- boolean[] pathNeedSlide = new boolean[previousTimes.length];
- Arrays.fill(pathNeedSlide, false);
- for (int resultId = 0; resultId < previousTimes.length; resultId++) {
- if (previousTimes[resultId] == curStartTime) {
- record.addField(previousValues[resultId], resultDataType[resultId]);
- if (!ascending) {
- pathNeedSlide[resultId] = true;
- }
- } else if (nextTimes[resultId] == curStartTime) {
- record.addField(nextValues[resultId], resultDataType[resultId]);
- if (ascending) {
- pathNeedSlide[resultId] = true;
- }
- } else if (previousTimes[resultId] < curStartTime && curStartTime <
nextTimes[resultId]) {
- fillRecord(
- resultId,
- record,
- new Pair<>(previousTimes[resultId], previousValues[resultId]),
- new Pair<>(nextTimes[resultId], nextValues[resultId]));
- } else if (curStartTime < previousTimes[resultId]) {
- fillRecord(
- resultId,
- record,
- new Pair<>(extraPreviousTimes[resultId],
extraPreviousValues[resultId]),
- new Pair<>(previousTimes[resultId], previousValues[resultId]));
- } else if (nextTimes[resultId] < curStartTime) {
- fillRecord(
- resultId,
- record,
- new Pair<>(nextTimes[resultId], nextValues[resultId]),
- new Pair<>(extraNextTimes[resultId], extraNextValues[resultId]));
- }
- }
-
- // Slide paths
- // the aggregation results of one path are either all null or all not null,
- // thus slide all results together
- for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) {
- List<Integer> resultIndex =
resultIndexes.get(deduplicatedPaths.get(pathId));
- if (pathNeedSlide[resultIndex.get(0)]) {
- pathSlide(pathId);
- }
- }
-
- if (!leftCRightO) {
- record.setTimestamp(curEndTime - 1);
- }
- return record;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index a167058..4ea2035 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -50,8 +50,8 @@ import java.util.stream.Collectors;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
- private List<IReaderByTimestamp> allDataReaderList;
- private GroupByTimePlan groupByTimePlan;
+ protected List<IReaderByTimestamp> allDataReaderList;
+ protected GroupByTimePlan groupByTimePlan;
private TimeGenerator timestampGenerator;
/** cached timestamp for next group by partition. */
private LinkedList<Long> cachedTimestamps = new LinkedList<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index e06e4c3..017780c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByFillEngineDataSet;
+import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithValueFilterDataSet;
import
org.apache.iotdb.db.query.dataset.groupby.GroupByFillWithoutValueFilterDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByLevelDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
@@ -243,6 +245,12 @@ public class QueryRouter implements IQueryRouter {
return new GroupByWithValueFilterDataSet(context, plan);
}
+ protected GroupByFillWithValueFilterDataSet
getGroupByFillWithValueFilterDataSet(
+ QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan)
+ throws QueryProcessException, StorageEngineException {
+ return new GroupByFillWithValueFilterDataSet(context, groupByTimeFillPlan);
+ }
+
protected GroupByFillWithoutValueFilterDataSet
getGroupByFillWithoutValueFilterDataSet(
QueryContext context, GroupByTimeFillPlan groupByFillPlan)
throws QueryProcessException, StorageEngineException {
@@ -264,21 +272,18 @@ public class QueryRouter implements IQueryRouter {
public QueryDataSet groupByFill(GroupByTimeFillPlan groupByFillPlan,
QueryContext context)
throws QueryFilterOptimizationException, StorageEngineException,
QueryProcessException {
- GroupByFillWithoutValueFilterDataSet dataSet;
+ GroupByFillEngineDataSet dataSet;
IExpression optimizedExpression = getOptimizeExpression(groupByFillPlan);
groupByFillPlan.setExpression(optimizedExpression);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
dataSet = getGroupByFillWithoutValueFilterDataSet(context,
groupByFillPlan);
- dataSet.init(context, groupByFillPlan);
+ ((GroupByFillWithoutValueFilterDataSet) dataSet).init(context,
groupByFillPlan);
} else {
- // dataSet = getGroupByFillWithValueFilterDataSet(context,
groupByFillPlan);
- // dataSet.init(context, groupByFillPlan);
- throw new QueryProcessException("Group by fill doesn't support
valueFilter yet.");
+ dataSet = getGroupByFillWithValueFilterDataSet(context, groupByFillPlan);
+ ((GroupByFillWithValueFilterDataSet) dataSet).init(context,
groupByFillPlan);
}
- // TODO: support group by level in group by fill
-
return dataSet;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillIT.java
index d06315d..620ae93 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillIT.java
@@ -137,6 +137,60 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void previousLastValueWithValueFilterTest() {
+ String[] retArray =
+ new String[] {"17,null", "22,28", "27,28", "32,29", "37,29", "42,29",
"47,30", "52,30"};
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature > 25 "
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[previous])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(last_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature > 25 "
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[previous]) order by
time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(last_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void previousFirstValueTest() {
String[] retArray =
new String[] {
@@ -246,6 +300,62 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void previousAvgWithValueFilterTest() {
+ String[] retArray =
+ new String[] {
+ "17,null", "22,null", "27,null", "32,44.7", "37,44.7", "42,44.7",
"47,55.2", "52,55.2"
+ };
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select avg(hardware) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE hardware > 35.0 "
+ + "GROUP BY ([17, 55), 5ms) FILL(double[previous])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(avg("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select avg(hardware) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE hardware > 35.0 "
+ + "GROUP BY ([17, 55), 5ms) FILL(double[previous]) order by
time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(avg("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void previousCountTest() {
String[] retArray =
new String[] {"17,0", "22,2", "27,0", "32,2", "37,0", "42,0", "47,2",
"52,0"};
@@ -298,6 +408,60 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void previousCountWithValueFilterTest() {
+ String[] retArray =
+ new String[] {"17,0", "22,1", "27,0", "32,1", "37,0", "42,0", "47,1",
"52,0"};
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(status) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE status = true "
+ + "GROUP BY ([17, 55), 5ms) FILL(int64[previous])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.ln.wf01.wt01.status"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(status) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE status = true "
+ + "GROUP BY ([17, 55), 5ms) FILL(int64[previous]) order by
time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.ln.wf01.wt01.status"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void previousMaxTimeTest() {
String[] retArray =
new String[] {"17,null", "22,25", "27,25", "32,36", "37,36", "42,36",
"47,50", "52,50"};
@@ -671,6 +835,63 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void previousUntilLastFirstValueWithValueFilterTest() {
+ String[] retArray =
+ new String[] {
+ "17,null", "22,null", "27,null", "32,44.6", "37,44.6", "42,44.6",
"47,54.6", "52,null"
+ };
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select first_value(hardware) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE hardware > 35.0 "
+ + "GROUP BY ([17, 55), 5ms)
FILL(double[previousUntilLast])");
+
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(first_value("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select first_value(hardware) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE hardware > 35.0 "
+ + "GROUP BY ([17, 55), 5ms) FILL(double[previousUntilLast])
order by time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(first_value("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void previousUntilLastAvgTest() {
String[] retArray =
new String[] {
@@ -881,6 +1102,62 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void previousUntilLastMaxValueWithValueFilterTest() {
+ String[] retArray =
+ new String[] {
+ "17,null", "22,23", "27,23", "32,24", "37,null", "42,null",
"47,null", "52,null"
+ };
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature <= 25"
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[previousUntilLast])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(max_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature <= 25"
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[previousUntilLast])
order by time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(max_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void previousUntilLastMinTimeTest() {
String[] retArray =
new String[] {"17,null", "22,23", "27,23", "32,33", "37,33", "42,33",
"47,48", "52,null"};
@@ -1497,6 +1774,60 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void valueMinValueWithValueFilterTest() {
+ String[] retArray =
+ new String[] {"17,10", "22,28", "27,10", "32,29", "37,10", "42,10",
"47,28", "52,10"};
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select min_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature > 25 "
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[10])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select min_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature > 25 "
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[10]) order by time
desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void valueSumTest() {
String[] retArray =
new String[] {
@@ -1987,6 +2318,60 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void linearMinValueWithValueFilterTest() {
+ String[] retArray =
+ new String[] {"17,null", "22,28", "27,28", "32,29", "37,29", "42,29",
"47,28", "52,null"};
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select min_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature > 25"
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[linear])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select min_value(temperature) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE temperature > 25"
+ + "GROUP BY ([17, 55), 5ms) FILL(int32[linear]) order by
time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void linearSumTest() {
String[] retArray =
new String[] {
@@ -2041,6 +2426,69 @@ public class IoTDBGroupByFillIT {
}
@Test
+ public void linearSumWithValueFilterTest() {
+ String[] retArray =
+ new String[] {
+ "17,null",
+ "22,34.9",
+ "27,62.150000000000006",
+ "32,89.4",
+ "37,96.4",
+ "42,103.4",
+ "47,110.4",
+ "52,null"
+ };
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select sum(hardware) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE hardware > 32.0 "
+ + "GROUP BY ([17, 55), 5ms) FILL(double[linear])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select sum(hardware) from "
+ + "root.ln.wf01.wt01 "
+ + "WHERE hardware > 32.0 "
+ + "GROUP BY ([17, 55), 5ms) FILL(double[linear]) order by
time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void leftORightCPreviousTest() {
String[] retArray =
new String[] {
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillMixPathsIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillMixPathsIT.java
index 7068448..9c018f9 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillMixPathsIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByFillMixPathsIT.java
@@ -77,6 +77,17 @@ public class IoTDBGroupByFillMixPathsIT {
"INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(72, 33)",
"INSERT INTO root.ln.wf01.wt01(timestamp, status) values(74, true)",
"INSERT INTO root.ln.wf01.wt01(timestamp, hardware) values(75, 46.8)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(110, 21, false, 11.1)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(112, 23, true, 22.3)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(114, 25, false, 33.5)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(123, 28, true, 34.9)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(125, 23, false, 31.7)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(133, 29, false, 44.6)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(136, 24, true, 44.8)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(148, 28, false, 54.6)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(150, 30, true, 55.8)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status,
hardware) values(166, 40, false, 33.0)",
"flush"
};
@@ -291,4 +302,184 @@ public class IoTDBGroupByFillMixPathsIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void singlePathMixWithValueFilterTest() {
+ String[] retArray =
+ new String[] {
+ // "112,33.5,33.5,114"
+ "117,null,null,114",
+ "122,39.05,39.05,114",
+ "127,null,null,114",
+ "132,44.6,44.6,133",
+ "137,47.93333333333334,47.93333333333334,133",
+ "142,51.266666666666666,51.266666666666666,133",
+ "147,54.6,54.6,148",
+ "152,47.4,47.4,148"
+ // "162,33.0,33.0,166"
+ };
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select sum(hardware), last_value(hardware), max_time(hardware) "
+ + "from root.ln.wf01.wt01 "
+ + "WHERE temperature >= 25 and status = false "
+ + "GROUP BY ([117, 155), 5ms) "
+ + "FILL(double[linear, 12ms, 12ms], int64[previous, 17ms])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.hardware"))
+ + ","
+ +
resultSet.getString(last_value("root.ln.wf01.wt01.hardware"))
+ + ","
+ +
resultSet.getString(max_time("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select sum(hardware), last_value(hardware), max_time(hardware) "
+ + "from root.ln.wf01.wt01 "
+ + "WHERE temperature >= 25 and status = false "
+ + "GROUP BY ([117, 155), 5ms) "
+ + "FILL(double[linear, 12ms, 12ms], int64[previous, 17ms]) "
+ + "order by time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.hardware"))
+ + ","
+ +
resultSet.getString(last_value("root.ln.wf01.wt01.hardware"))
+ + ","
+ +
resultSet.getString(max_time("root.ln.wf01.wt01.hardware"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void MultiPathsMixWithValueFilterTest() {
+ String[] retArray =
+ new String[] {
+ "117,null,null,114,26,null,true",
+ "122,27.0,39.05,114,27,39.05,true",
+ "127,null,null,114,null,null,true",
+ "132,29.0,44.6,133,29,44.6,false",
+
"137,28.666666666666668,47.93333333333334,133,29,47.93333333333334,true",
+
"142,28.333333333333332,51.266666666666666,133,29,51.266666666666666,true",
+ "147,28.0,54.6,148,28,54.6,false",
+ "152,32.0,47.4,null,32,47.4,true"
+ };
+
+ /* Format result
+ linear, linear, preUntil, linear, linear,
value
+ 7, 25.0, 33.5, 114, 25, 33.5,
false
+ 117, null, null, 114(null), 26(null), null,
true(null)
+ 122, 27.0(null), 39.05(null), 114(null), 27(null), 39.05(null),
true(null)
+ 127, null, null, 114(null), null, null,
true(null)
+ 132, 29.0, 44.6, 133, 29, 44.6,
false
+ 137, 28.67(null), 47.93(null), 133(null), 29(null), 47.93(null),
true(null)
+ 142, 28.33(null), 51.27(null), 133(null), 29(null), 51.27(null),
true(null)
+ 147, 28.0, 54.6, 148, 28, 54.6,
false
+ 152, 32.0(null), 47.4(null), null, 32(null), 47.4(null),
true(null)
+ 162, 40.0, 33.0, null, 40, 33.0,
null
+ */
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select sum(temperature), avg(hardware), max_time(status), "
+ + "min_value(temperature), max_value(hardware),
first_value(status) "
+ + "from root.ln.wf01.wt01 "
+ + "WHERE temperature >= 25 and status = false "
+ + "GROUP BY ([117, 155), 5ms) "
+ + "FILL(double[linear, 12ms, 12ms], int32[linear, 12ms,
18ms], "
+ + "int64[previousUntilLast, 17ms], boolean[true])");
+ assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.temperature"))
+ + ","
+ + resultSet.getString(avg("root.ln.wf01.wt01.hardware"))
+ + ","
+ + resultSet.getString(max_time("root.ln.wf01.wt01.status"))
+ + ","
+ +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))
+ + ","
+ +
resultSet.getString(max_value("root.ln.wf01.wt01.hardware"))
+ + ","
+ +
resultSet.getString(first_value("root.ln.wf01.wt01.status"));
+ assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select sum(temperature), avg(hardware), max_time(status), "
+ + "min_value(temperature), max_value(hardware),
first_value(status) "
+ + "from root.ln.wf01.wt01 "
+ + "WHERE temperature >= 25 and status = false "
+ + "GROUP BY ([117, 155), 5ms) "
+ + "FILL(double[linear, 12ms, 12ms], int32[linear, 12ms,
18ms], "
+ + "int64[previousUntilLast, 17ms], boolean[true]) "
+ + "order by time desc");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.temperature"))
+ + ","
+ + resultSet.getString(avg("root.ln.wf01.wt01.hardware"))
+ + ","
+ + resultSet.getString(max_time("root.ln.wf01.wt01.status"))
+ + ","
+ +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))
+ + ","
+ +
resultSet.getString(max_value("root.ln.wf01.wt01.hardware"))
+ + ","
+ +
resultSet.getString(first_value("root.ln.wf01.wt01.status"));
+ assertEquals(retArray[retArray.length - cnt - 1], ans);
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
index 65d157b..ba6578a 100644
---
a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
@@ -108,93 +108,87 @@ public class GroupByFillDataSetTest {
}
}
- // TODO: @CRZbulabula
- // GroupByFillWithValueFilterDataSet
- // @Test
- // public void groupByWithValueFilterFillTest() throws Exception {
- // QueryPlan queryPlan =
- // (QueryPlan)
- // processor.parseSQLToPhysicalPlan(
- // "select last_value(s0) from root.vehicle.* where s1 > 1
group by ([0,20), 1ms)
- // fill (int32[Previous]) order by time desc");
- // QueryDataSet dataSet =
- // queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
- // for (int i = 19; i >= 7; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\t7", dataSet.next().toString());
- // }
- // assertTrue(dataSet.hasNext());
- // assertEquals("6\t6", dataSet.next().toString());
- // for (int i = 5; i >= 0; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\tnull", dataSet.next().toString());
- // }
- // }
+ @Test
+ public void groupByWithValueFilterFillTest() throws Exception {
+ QueryPlan queryPlan =
+ (QueryPlan)
+ processor.parseSQLToPhysicalPlan(
+ "select last_value(s0) from root.vehicle.* where s1 > 1 group
by ([0,20), 1ms) fill (int32[Previous]) order by time desc");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
+ for (int i = 19; i >= 7; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\t7", dataSet.next().toString());
+ }
+ assertTrue(dataSet.hasNext());
+ assertEquals("6\t6", dataSet.next().toString());
+ for (int i = 5; i >= 0; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\tnull", dataSet.next().toString());
+ }
+ }
+
+ @Test
+ public void groupByWithAndFilterFillTest() throws Exception {
+ QueryPlan queryPlan =
+ (QueryPlan)
+ processor.parseSQLToPhysicalPlan(
+ "select last_value(s0) from root.vehicle.* where s1 > 1 or s0
> 1 group by ([0,20), 1ms) fill (int32[Previous]) order by time desc");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
+ for (int i = 19; i >= 8; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\t8", dataSet.next().toString());
+ }
+ assertTrue(dataSet.hasNext());
+ assertEquals("7\t7", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("6\t6", dataSet.next().toString());
+ for (int i = 5; i >= 0; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\tnull", dataSet.next().toString());
+ }
+ }
- // @Test
- // public void groupByWithAndFilterFillTest() throws Exception {
- // QueryPlan queryPlan =
- // (QueryPlan)
- // processor.parseSQLToPhysicalPlan(
- // "select last_value(s0) from root.vehicle.* where s1 > 1 or
s0 > 1 group by
- // ([0,20), 1ms) fill (int32[Previous]) order by time desc");
- // QueryDataSet dataSet =
- // queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
- // for (int i = 19; i >= 8; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\t8", dataSet.next().toString());
- // }
- // assertTrue(dataSet.hasNext());
- // assertEquals("7\t7", dataSet.next().toString());
- // assertTrue(dataSet.hasNext());
- // assertEquals("6\t6", dataSet.next().toString());
- // for (int i = 5; i >= 0; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\tnull", dataSet.next().toString());
- // }
- // }
- //
- // @Test
- // public void groupByWithFirstNullTest() throws Exception {
- // QueryPlan queryPlan =
- // (QueryPlan)
- // processor.parseSQLToPhysicalPlan(
- // "select last_value(s0) from root.vehicle.* where s1 > 1 or
s0 > 1 group by
- // ([5,20), 1ms) fill (int32[Previous]) order by time desc");
- // QueryDataSet dataSet =
- // queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
- // for (int i = 19; i >= 8; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\t8", dataSet.next().toString());
- // }
- // assertTrue(dataSet.hasNext());
- // assertEquals("7\t7", dataSet.next().toString());
- // assertTrue(dataSet.hasNext());
- // assertEquals("6\t6", dataSet.next().toString());
- // assertTrue(dataSet.hasNext());
- // assertEquals("5\t1", dataSet.next().toString());
- // }
- //
- // @Test
- // public void groupByWithCross() throws Exception {
- // QueryPlan queryPlan =
- // (QueryPlan)
- // processor.parseSQLToPhysicalPlan(
- // "select last_value(s0) from root.vehicle.* where s2 > 1
group by ([0,20), 1ms)
- // fill (int32[Previous]) order by time desc");
- // QueryDataSet dataSet =
- // queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
- // for (int i = 19; i >= 8; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\t7", dataSet.next().toString());
- // }
- // assertTrue(dataSet.hasNext());
- // assertEquals("7\t7", dataSet.next().toString());
- // assertTrue(dataSet.hasNext());
- // assertEquals("6\t6", dataSet.next().toString());
- // for (int i = 5; i >= 0; i--) {
- // assertTrue(dataSet.hasNext());
- // assertEquals(i + "\tnull", dataSet.next().toString());
- // }
- // }
+ @Test
+ public void groupByWithFirstNullTest() throws Exception {
+ QueryPlan queryPlan =
+ (QueryPlan)
+ processor.parseSQLToPhysicalPlan(
+ "select last_value(s0) from root.vehicle.* where s1 > 1 or s0
>= 1 group by ([5,20), 1ms) fill (int32[Previous, 11ms]) order by time desc");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
+ for (int i = 19; i >= 8; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\t8", dataSet.next().toString());
+ }
+ assertTrue(dataSet.hasNext());
+ assertEquals("7\t7", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("6\t6", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("5\t1", dataSet.next().toString());
+ }
+
+ @Test
+ public void groupByWithCross() throws Exception {
+ QueryPlan queryPlan =
+ (QueryPlan)
+ processor.parseSQLToPhysicalPlan(
+ "select last_value(s0) from root.vehicle.* where s2 > 1 group
by ([0,20), 1ms) fill (int32[Previous]) order by time desc");
+ QueryDataSet dataSet =
+ queryExecutor.processQuery(queryPlan,
EnvironmentUtils.TEST_QUERY_CONTEXT);
+ for (int i = 19; i >= 8; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\t7", dataSet.next().toString());
+ }
+ assertTrue(dataSet.hasNext());
+ assertEquals("7\t7", dataSet.next().toString());
+ assertTrue(dataSet.hasNext());
+ assertEquals("6\t6", dataSet.next().toString());
+ for (int i = 5; i >= 0; i--) {
+ assertTrue(dataSet.hasNext());
+ assertEquals(i + "\tnull", dataSet.next().toString());
+ }
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
index ffe6538..1cba9c6 100755
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
@@ -41,10 +41,18 @@ public abstract class BinaryFilter implements Filter,
Serializable {
this.right = right;
}
+ public void setLeft(Filter left) {
+ this.left = left;
+ }
+
public Filter getLeft() {
return left;
}
+ public void setRight(Filter right) {
+ this.right = right;
+ }
+
public Filter getRight() {
return right;
}