This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7e68943cab0fb1ab53654c5a324673ba86c9058b Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Mar 1 23:20:29 2022 +0800 complete basic design of all operators --- .../distribution/common/GroupByTimeParameter.java | 10 +++++++ .../query/distribution/common/LevelBucketInfo.java | 15 ++++++++++ .../distribution/common/SeriesBatchAggInfo.java | 21 +++++++++++++ .../operator/GroupByLevelOperator.java | 34 ++++++++++++++++++++++ .../operator/SeriesAggregateOperator.java | 33 +++++++++++++++++++++ .../distribution/operator/SeriesScanOperator.java | 10 +++++++ 6 files changed, 123 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java new file mode 100644 index 0000000..3f7e1bc --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java @@ -0,0 +1,10 @@ +package org.apache.iotdb.db.query.distribution.common; + +import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; + +/** + * In single-node IoTDB, the GroupByTimePlan is used to represent the parameter of `group by time`. + * To avoid ambiguity, we use another name `GroupByTimeParameter` here + */ +public class GroupByTimeParameter extends GroupByTimePlan { +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java new file mode 100644 index 0000000..6888475 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java @@ -0,0 +1,15 @@ +package org.apache.iotdb.db.query.distribution.common; + +import java.util.List; +import java.util.Map; + +/** + * This class is used to store all the buckets for the GroupByLevelOperator + * It stores the levels index and all the enumerated values in each level by a HashMap + * Using the HashMap, the operator could calculate all the buckets using combination of values from each level + */ +public class LevelBucketInfo { + // eg: If the clause is `group by level = 1, 2, 3`, the map should be like + // map{1 -> ['a', 'b'], 2 -> ['aa', 'bb'], 3 -> ['aaa', 'bbb']} + private Map<Integer, List<String>> levelMap; +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java new file mode 100644 index 0000000..2ea9f9a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java @@ -0,0 +1,21 @@ +package org.apache.iotdb.db.query.distribution.common; + +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.TimeRange; + +/** + * SeriesBatchAggInfo is the "batch" result of SeriesAggregateOperator when its getNextBatch() is invoked. + */ +public class SeriesBatchAggInfo { + // Path of the series. + // Path will be used in the downstream operators. + // GroupByLevelOperator will use it to divide the data into different buckets to do the rollup operation. + private Path path; + + // Time range of current statistic. + private TimeRange timeRange; + + // Statistics for the series in current time range + private Statistics<?> statistics; +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java new file mode 100644 index 0000000..f5fe41d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java @@ -0,0 +1,34 @@ +package org.apache.iotdb.db.query.distribution.operator; + +import org.apache.iotdb.db.query.distribution.common.GroupByTimeParameter; +import org.apache.iotdb.db.query.distribution.common.LevelBucketInfo; +import org.apache.iotdb.db.query.distribution.common.Tablet; + +/** + * This operator is responsible for the final aggregation merge operation. + * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and + * different devices will be rolled up by corresponding level into different buckets. + * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up. + * If the groupByTimeParameter is null, the data won't be split by time range. + * + * Children type: [SeriesAggregateOperator] + */ +public class GroupByLevelOperator extends ExecOperator<Tablet> { + + // All the buckets that the SeriesBatchAggInfo from upstream will be divided into. + private LevelBucketInfo bucketInfo; + + // The parameter of `group by time` + // The GroupByLevelOperator also need GroupByTimeParameter + private GroupByTimeParameter groupByTimeParameter; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java new file mode 100644 index 0000000..e5d02f8 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java @@ -0,0 +1,33 @@ +package org.apache.iotdb.db.query.distribution.operator; + +import org.apache.iotdb.db.query.distribution.common.GroupByTimeParameter; +import org.apache.iotdb.db.query.distribution.common.SeriesBatchAggInfo; + +/** + * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. + * This operator will split data in one series into many groups by time range and do the aggregation calculation for each + * group. + * If there is no split parameter, it will return one result which is the aggregation result of all data in current series. + * + * Children type: [SeriesScanOperator] + */ +public class SeriesAggregateOperator extends ExecOperator<SeriesBatchAggInfo> { + + // The parameter of `group by time` + // Its value will be null if there is no `group by time` clause, + private GroupByTimeParameter groupByTimeParameter; + + // TODO: need consider how to represent the aggregation function and corresponding implementation + // We use a String to indicate the parameter temporarily + private String aggregationFunc; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchAggInfo getNextBatch() { + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java index 11527a5..94bc7da 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java @@ -2,7 +2,9 @@ package org.apache.iotdb.db.query.distribution.operator; import org.apache.iotdb.db.query.distribution.common.SeriesBatchData; import org.apache.iotdb.db.query.distribution.common.TraversalOrder; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.filter.basic.Filter; /** @@ -44,4 +46,12 @@ public class SeriesScanOperator extends ExecOperator<SeriesBatchData> { public SeriesBatchData getNextBatch() { return null; } + + // This method will only be invoked by SeriesAggregateOperator + // It will return the statistics of the series in given time range + // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using + // raw data is the final way to do that. + public Statistics<?> getNextStatisticBetween(TimeRange timeRange) { + return null; + } }
