This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch groupbyfill in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0914138cf53aa1636a78816cde55d4954fe715fc Author: JackieTien97 <[email protected]> AuthorDate: Mon Feb 24 17:10:29 2020 +0800 need to wait for last --- .../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 22 ++- .../apache/iotdb/db/qp/constant/SQLConstant.java | 6 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 17 +- .../org/apache/iotdb/db/qp/logical/Operator.java | 2 +- .../iotdb/db/qp/physical/crud/GroupByFillPlan.java | 25 +++ .../iotdb/db/qp/strategy/LogicalGenerator.java | 202 +++++++-------------- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 70 ++----- .../dataset/groupby/GroupByEngineDataSet.java | 10 + .../query/dataset/groupby/GroupByFillDataSet.java | 85 +++++++++ .../iotdb/db/query/executor/IQueryRouter.java | 12 +- .../iotdb/db/query/executor/QueryRouter.java | 15 +- .../apache/iotdb/db/query/fill/PreviousFill.java | 17 +- 12 files changed, 268 insertions(+), 215 deletions(-) diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 index 1a8558f..2434075 100644 --- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 +++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 @@ -153,6 +153,7 @@ fromClause specialClause : specialLimit | groupByClause specialLimit? + | groupByFillClause | fillClause slimitClause? alignByDeviceClauseOrDisableAlign? ; @@ -204,9 +205,18 @@ groupByClause RR_BRACKET ; +groupByFillClause + : GROUP BY LR_BRACKET + timeInterval + COMMA DURATION + RR_BRACKET + fillClause + ; + typeClause : dataType LS_BRACKET linearClause RS_BRACKET - | dataType LS_BRACKET previousClause RS_BRACKET + | dataType LS_BRACKET previousClause RS_BRACKET + | dataType LS_BRACKET previousUntilLastClause RS_BRACKET ; linearClause @@ -217,6 +227,10 @@ previousClause : PREVIOUS (COMMA DURATION)? ; +previousUntilLastClause + : PREVIOUSUNTILLAST + ; + indexWithClause : WITH indexValue (COMMA indexValue)? ; @@ -302,7 +316,7 @@ nodeNameWithoutStar ; dataType - : INT32 | INT64 | FLOAT | DOUBLE | BOOLEAN | TEXT + : INT32 | INT64 | FLOAT | DOUBLE | BOOLEAN | TEXT | ALL ; dateFormat @@ -440,6 +454,10 @@ PREVIOUS : P R E V I O U S ; +PREVIOUSUNTILLAST + : P R E V I O U S U N T I L L A S T + ; + METADATA : M E T A D A T A ; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java index 108c850..c5ec94a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java @@ -18,14 +18,12 @@ */ package org.apache.iotdb.db.qp.constant; +import org.apache.iotdb.db.qp.strategy.SqlBaseLexer; import org.apache.iotdb.tsfile.read.common.Path; import java.util.HashMap; import java.util.Map; -import org.apache.iotdb.db.qp.strategy.SqlBaseLexer; -import org.apache.iotdb.tsfile.read.common.Path; - /** * this class contains several constants used in SQL. */ @@ -66,6 +64,8 @@ public class SQLConstant { public static final String AVG = "avg"; public static final String SUM = "sum"; + public static final String ALL = "all"; + public static final int KW_AND = 1; public static final int KW_OR = 2; public static final int KW_NOT = 3; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 2cf135d..e968e7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -71,16 +71,7 @@ import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; -import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; -import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; -import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; -import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; -import org.apache.iotdb.db.qp.physical.crud.QueryPlan; -import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; -import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.qp.physical.crud.*; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.CountPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; @@ -228,9 +219,11 @@ public class PlanExecutor implements IPlanExecutor { if (queryPlan instanceof AlignByDevicePlan) { queryDataSet = new AlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter); } else { - if (queryPlan instanceof GroupByPlan) { + if (queryPlan instanceof GroupByFillPlan) { + GroupByFillPlan groupByFillPlan = (GroupByFillPlan) queryPlan; + } else if (queryPlan instanceof GroupByPlan) { GroupByPlan groupByPlan = (GroupByPlan) queryPlan; - return queryRouter.groupBy(groupByPlan, context); + return queryRouter.groupByFill(groupByPlan, context); } else if (queryPlan instanceof AggregationPlan) { AggregationPlan aggregationPlan = (AggregationPlan) queryPlan; queryDataSet = queryRouter.aggregate(aggregationPlan, context); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java index b9a44f2..7144b40 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java @@ -74,6 +74,6 @@ public abstract class Operator { DELETE_ROLE, GRANT_ROLE_PRIVILEGE, REVOKE_ROLE_PRIVILEGE, LIST_USER, LIST_ROLE, LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS, GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING, - TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE + TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, GROUP_BY_FILL } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java new file mode 100644 index 0000000..432f2bc --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java @@ -0,0 +1,25 @@ +package org.apache.iotdb.db.qp.physical.crud; + +import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.query.fill.IFill; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import java.util.Map; + +public class GroupByFillPlan extends GroupByPlan { + + private Map<TSDataType, IFill> fillTypes; + + public GroupByFillPlan() { + super(); + setOperatorType(Operator.OperatorType.GROUP_BY_FILL); + } + + public Map<TSDataType, IFill> getFillType() { + return fillTypes; + } + + public void setFillType(Map<TSDataType, IFill> fillTypes) { + this.fillTypes = fillTypes; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java index 9c5d837..0180540 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java @@ -18,128 +18,16 @@ */ package org.apache.iotdb.db.qp.strategy; -import java.io.File; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.antlr.v4.runtime.tree.TerminalNode; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.runtime.SQLParserException; import org.apache.iotdb.db.qp.constant.DatetimeUtils; import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.logical.RootOperator; -import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; -import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator; -import org.apache.iotdb.db.qp.logical.crud.FilterOperator; -import org.apache.iotdb.db.qp.logical.crud.FromOperator; -import org.apache.iotdb.db.qp.logical.crud.InOperator; -import org.apache.iotdb.db.qp.logical.crud.InsertOperator; -import org.apache.iotdb.db.qp.logical.crud.QueryOperator; -import org.apache.iotdb.db.qp.logical.crud.SelectOperator; -import org.apache.iotdb.db.qp.logical.crud.UpdateOperator; -import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; +import org.apache.iotdb.db.qp.logical.crud.*; +import org.apache.iotdb.db.qp.logical.sys.*; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType; -import org.apache.iotdb.db.qp.logical.sys.CountOperator; -import org.apache.iotdb.db.qp.logical.sys.CreateTimeSeriesOperator; -import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator; -import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator; -import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator; -import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator; -import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator; -import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator; -import org.apache.iotdb.db.qp.logical.sys.MoveFileOperator; -import org.apache.iotdb.db.qp.logical.sys.RemoveFileOperator; -import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator; -import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AlignByDeviceClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AlterUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AndExpressionContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AttributeClausesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AutoCreateSchemaContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ConstantContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountNodesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountTimeseriesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateTimeseriesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DateExpressionContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteStorageGroupContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DeleteTimeseriesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DropUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.FillClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.FromClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.FunctionCallContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.FunctionElementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.GrantRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.GrantRoleToUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.GrantUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.GrantWatermarkEmbeddingContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.GroupByClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.InClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.InsertColumnSpecContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.InsertStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.InsertValuesSpecContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.LimitClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListAllRoleOfUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListAllUserOfRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListPrivilegesRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListPrivilegesUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListRolePrivilegesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ListUserPrivilegesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.LoadConfigurationStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.LoadFilesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.LoadStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.MoveFileContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.NodeNameContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.NodeNameWithoutStarContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.OffsetClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.OrExpressionContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.PredicateContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.PrefixPathContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.PrivilegesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.PropertyContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.RemoveFileContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.RevokeRoleContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.RevokeRoleFromUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.RevokeUserContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.RevokeWatermarkEmbeddingContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.RootOrIdContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SelectConstElementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SelectElementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SelectStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SetColContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SetStorageGroupContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SetTTLStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowAllTTLStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowChildPathsContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowDevicesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowStorageGroupContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTTLStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTimeseriesContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowVersionContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SlimitClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SoffsetClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.SuffixPathContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.TimeIntervalContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.TimeseriesPathContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.TypeClauseContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.UnsetTTLStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.UpdateStatementContext; -import org.apache.iotdb.db.qp.strategy.SqlBaseParser.WhereClauseContext; +import org.apache.iotdb.db.qp.strategy.SqlBaseParser.*; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.db.query.fill.LinearFill; import org.apache.iotdb.db.query.fill.PreviousFill; @@ -151,6 +39,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.StringContainer; +import java.io.File; +import java.time.ZoneId; +import java.util.*; + /** * This class is a listener and you can get an operator which is a logical plan. */ @@ -667,34 +559,50 @@ public class LogicalGenerator extends SqlBaseBaseListener { } @Override - public void enterGroupByClause(GroupByClauseContext ctx) { - super.enterGroupByClause(ctx); + public void enterGroupByFillClause(SqlBaseParser.GroupByFillClauseContext ctx) { + super.enterGroupByFillClause(ctx); queryOp.setGroupBy(true); + queryOp.setFill(true); // parse timeUnit - queryOp.setUnit(parseDuration(ctx.DURATION(0).getText())); + queryOp.setUnit(parseDuration(ctx.DURATION().getText())); queryOp.setSlidingStep(queryOp.getUnit()); - // parse sliding step - if (ctx.DURATION().size() == 2) { - queryOp.setSlidingStep(parseDuration(ctx.DURATION(1).getText())); - if (queryOp.getSlidingStep() < queryOp.getUnit()) { - throw new SQLParserException( - "The third parameter sliding step shouldn't be smaller than the second parameter time interval."); + + parseTimeInterval(ctx.timeInterval()); + + FillClauseContext fillClauseContext = ctx.fillClause(); + super.enterFillClause(fillClauseContext); + List<TypeClauseContext> list = fillClauseContext.typeClause(); + Map<TSDataType, IFill> fillTypes = new EnumMap<>(TSDataType.class); + for (TypeClauseContext typeClause : list) { + // group by fill doesn't support linear fill + if (typeClause.linearClause() != null) { + throw new SQLParserException("group by fill doesn't support linear fill"); + } + // all type use the same fill way + if (SQLConstant.ALL.equals(typeClause.dataType().getText())) { + + break; + } else { + parseTypeClause(typeClause, fillTypes); } } + queryOp.setFill(true); + queryOp.setFillTypes(fillTypes); + } + private void parseTimeInterval(TimeIntervalContext timeIntervalContext) { long startTime; long endTime; - TimeIntervalContext timeInterval = ctx.timeInterval(); - if (timeInterval.timeValue(0).INT() != null) { - startTime = Long.parseLong(timeInterval.timeValue(0).INT().getText()); + if (timeIntervalContext.timeValue(0).INT() != null) { + startTime = Long.parseLong(timeIntervalContext.timeValue(0).INT().getText()); } else { - startTime = parseTimeFormat(timeInterval.timeValue(0).dateFormat().getText()); + startTime = parseTimeFormat(timeIntervalContext.timeValue(0).dateFormat().getText()); } - if (timeInterval.timeValue(1).INT() != null) { - endTime = Long.parseLong(timeInterval.timeValue(1).INT().getText()); + if (timeIntervalContext.timeValue(1).INT() != null) { + endTime = Long.parseLong(timeIntervalContext.timeValue(1).INT().getText()); } else { - endTime = parseTimeFormat(timeInterval.timeValue(1).dateFormat().getText()); + endTime = parseTimeFormat(timeIntervalContext.timeValue(1).dateFormat().getText()); } queryOp.setStartTime(startTime); @@ -702,6 +610,26 @@ public class LogicalGenerator extends SqlBaseBaseListener { } @Override + public void enterGroupByClause(GroupByClauseContext ctx) { + super.enterGroupByClause(ctx); + queryOp.setGroupBy(true); + + // parse timeUnit + queryOp.setUnit(parseDuration(ctx.DURATION(0).getText())); + queryOp.setSlidingStep(queryOp.getUnit()); + // parse sliding step + if (ctx.DURATION().size() == 2) { + queryOp.setSlidingStep(parseDuration(ctx.DURATION(1).getText())); + if (queryOp.getSlidingStep() < queryOp.getUnit()) { + throw new SQLParserException( + "The third parameter sliding step shouldn't be smaller than the second parameter time interval."); + } + } + + parseTimeInterval(ctx.timeInterval()); + } + + @Override public void enterFillClause(FillClauseContext ctx) { super.enterFillClause(ctx); FilterOperator filterOperator = queryOp.getFilterOperator(); @@ -725,7 +653,8 @@ public class LogicalGenerator extends SqlBaseBaseListener { } int defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval(); - if (ctx.linearClause() != null) { + + if (ctx.linearClause() != null) { // linear if (ctx.linearClause().DURATION(0) != null) { long beforeRange = parseDuration(ctx.linearClause().DURATION(0).getText()); long afterRange = parseDuration(ctx.linearClause().DURATION(1).getText()); @@ -733,13 +662,20 @@ public class LogicalGenerator extends SqlBaseBaseListener { } else { fillTypes.put(dataType, new LinearFill(defaultFillInterval, defaultFillInterval)); } - } else { + } else if (ctx.previousClause() != null) { // previous if (ctx.previousClause().DURATION() != null) { long preRange = parseDuration(ctx.previousClause().DURATION().getText()); fillTypes.put(dataType, new PreviousFill(preRange)); } else { fillTypes.put(dataType, new PreviousFill(defaultFillInterval)); } + } else { // previous until last + if (ctx.previousClause().DURATION() != null) { + long preRange = parseDuration(ctx.previousClause().DURATION().getText()); + fillTypes.put(dataType, new PreviousFill(preRange, true)); + } else { + fillTypes.put(dataType, new PreviousFill(defaultFillInterval, true)); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index ee44e56..3e22199 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -18,14 +18,6 @@ */ package org.apache.iotdb.db.qp.strategy; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.LogicalOperatorException; @@ -35,58 +27,19 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; -import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; -import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator; -import org.apache.iotdb.db.qp.logical.crud.FilterOperator; -import org.apache.iotdb.db.qp.logical.crud.InsertOperator; -import org.apache.iotdb.db.qp.logical.crud.QueryOperator; -import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; -import org.apache.iotdb.db.qp.logical.sys.CountOperator; -import org.apache.iotdb.db.qp.logical.sys.CreateTimeSeriesOperator; -import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator; -import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator; -import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator; -import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator; -import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator; -import org.apache.iotdb.db.qp.logical.sys.MoveFileOperator; -import org.apache.iotdb.db.qp.logical.sys.RemoveFileOperator; -import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator; -import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator; -import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator; +import org.apache.iotdb.db.qp.logical.crud.*; +import org.apache.iotdb.db.qp.logical.sys.*; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; -import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; -import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; -import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; -import org.apache.iotdb.db.qp.physical.crud.QueryPlan; -import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; -import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; -import org.apache.iotdb.db.qp.physical.sys.CountPlan; -import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; -import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan; -import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; -import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; -import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan; -import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan; -import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan; -import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; -import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; -import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan; -import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; -import org.apache.iotdb.db.qp.physical.sys.ShowPlan; +import org.apache.iotdb.db.qp.physical.crud.*; +import org.apache.iotdb.db.qp.physical.sys.*; import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType; -import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; -import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.service.TSServiceImpl; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.IExpression; +import java.util.*; + /** * Used to convert logical operator to physical plan */ @@ -215,7 +168,16 @@ public class PhysicalGenerator { throws QueryProcessException { QueryPlan queryPlan; - if (queryOperator.isGroupBy()) { + if (queryOperator.isGroupBy() && queryOperator.isFill()) { + queryPlan = new GroupByFillPlan(); + ((GroupByFillPlan) queryPlan).setInterval(queryOperator.getUnit()); + ((GroupByFillPlan) queryPlan).setSlidingStep(queryOperator.getSlidingStep()); + ((GroupByFillPlan) queryPlan).setStartTime(queryOperator.getStartTime()); + ((GroupByFillPlan) queryPlan).setEndTime(queryOperator.getEndTime()); + ((GroupByFillPlan) queryPlan) + .setAggregations(queryOperator.getSelectOperator().getAggregations()); + ((GroupByFillPlan) queryPlan).setFillType(queryOperator.getFillTypes()); + } else if (queryOperator.isGroupBy()) { queryPlan = new GroupByPlan(); ((GroupByPlan) queryPlan).setInterval(queryOperator.getUnit()); ((GroupByPlan) queryPlan).setSlidingStep(queryOperator.getSlidingStep()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java index 94d290e..47a5590 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java @@ -21,9 +21,12 @@ package org.apache.iotdb.db.query.dataset.groupby; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Pair; +import java.io.IOException; + public abstract class GroupByEngineDataSet extends QueryDataSet { protected long queryId; @@ -75,6 +78,13 @@ public abstract class GroupByEngineDataSet extends QueryDataSet { } } + @Override + protected abstract RowRecord nextWithoutConstraint() throws IOException; + + public long getStartTime() { + return startTime; + } + @TestOnly public Pair<Long, Long> nextTimePartition() { hasCachedTimeInterval = false; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java new file mode 100644 index 0000000..1bfc43b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java @@ -0,0 +1,85 @@ +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.fill.IFill; +import org.apache.iotdb.db.query.fill.PreviousFill; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class GroupByFillDataSet extends QueryDataSet { + + private GroupByEngineDataSet groupByEngineDataSet; + private Map<TSDataType, IFill> fillTypes; + private Object[] previousValue; + private long[] lastTimeArray; + + public GroupByFillDataSet(List<Path> paths, List<TSDataType> dataTypes, GroupByEngineDataSet groupByEngineDataSet, + Map<TSDataType, IFill> fillTypes, QueryContext context) + throws StorageEngineException, IOException, UnSupportedFillTypeException { + super(paths, dataTypes); + this.groupByEngineDataSet = groupByEngineDataSet; + this.fillTypes = fillTypes; + initPreviousParis(context); + initLastTimeArray(); + } + + private void initPreviousParis(QueryContext context) throws StorageEngineException, IOException, UnSupportedFillTypeException { + previousValue = new Object[paths.size()]; + for (int i = 0; i < paths.size(); i++) { + Path path = paths.get(i); + TSDataType dataType = dataTypes.get(i); + IFill fill = new PreviousFill(dataType, groupByEngineDataSet.getStartTime(), -1L); + fill.constructReaders(path, context); + + TimeValuePair timeValuePair = fill.getFillResult(); + if (timeValuePair == null || timeValuePair.getValue() == null) { + previousValue[i] = null; + } else { + previousValue[i] = timeValuePair.getValue().getValue(); + } + } + } + + private void initLastTimeArray() { + lastTimeArray = new long[paths.size()]; + Arrays.fill(lastTimeArray, -1L); + + } + + @Override + protected boolean hasNextWithoutConstraint() { + return groupByEngineDataSet.hasNextWithoutConstraint(); + } + + @Override + protected RowRecord nextWithoutConstraint() throws IOException { + RowRecord rowRecord = groupByEngineDataSet.nextWithoutConstraint(); + + for (int i = 0; i < paths.size(); i++) { + Field field = rowRecord.getFields().get(i); + // current group by result is null + if (field.getDataType() == null) { + // the previous value is not null and (fill type is not previous until last or now time is before last time) + if (previousValue[i] != null + && (!((PreviousFill)fillTypes.get(field.getDataType())).isUntilLast() || rowRecord.getTimestamp() <= lastTimeArray[i])) { + rowRecord.getFields().set(i, Field.getField(previousValue, field.getDataType())); + } + } else { + // use now value update previous value + previousValue[i] = field.getObjectValue(field.getDataType()); + } + } + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java index 31f7081..a667d4c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/IQueryRouter.java @@ -19,17 +19,15 @@ package org.apache.iotdb.db.query.executor; -import java.io.IOException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; -import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; -import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; -import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.*; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import java.io.IOException; + public interface IQueryRouter { /** @@ -55,4 +53,8 @@ public interface IQueryRouter { */ QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context) throws StorageEngineException, QueryProcessException, IOException; + + QueryDataSet groupByFill(GroupByFillPlan groupByFillPlan, QueryContext context) + throws QueryFilterOptimizationException, StorageEngineException, + QueryProcessException, IOException; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 3f27bb4..975c2fd 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -24,11 +24,10 @@ import java.util.List; import java.util.Map; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; -import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; -import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; -import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.*; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet; +import org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet; import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet; import org.apache.iotdb.db.query.dataset.groupby.GroupByWithoutValueFilterDataSet; import org.apache.iotdb.db.query.fill.IFill; @@ -149,4 +148,12 @@ public class QueryRouter implements IQueryRouter { return fillQueryExecutor.execute(context); } + @Override + public QueryDataSet groupByFill(GroupByFillPlan groupByFillPlan, QueryContext context) + throws QueryFilterOptimizationException, StorageEngineException, QueryProcessException, IOException { + GroupByEngineDataSet groupByEngineDataSet = (GroupByEngineDataSet) groupBy(groupByFillPlan, context); + return new GroupByFillDataSet(groupByFillPlan.getDeduplicatedPaths(), groupByFillPlan.getDeduplicatedDataTypes(), + groupByEngineDataSet, groupByFillPlan.getFillType(), context); + } + } diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java index 3d92fb6..441075b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java +++ b/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java @@ -31,15 +31,26 @@ public class PreviousFill extends IFill { private long beforeRange; private BatchData batchData; + private boolean untilLast; public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) { + this(dataType, queryTime, beforeRange, false); + } + + public PreviousFill(long beforeRange) { + this(beforeRange, false); + } + + public PreviousFill(TSDataType dataType, long queryTime, long beforeRange, boolean untilLast) { super(dataType, queryTime); this.beforeRange = beforeRange; batchData = new BatchData(); + this.untilLast = untilLast; } - public PreviousFill(long beforeRange) { + public PreviousFill(long beforeRange, boolean untilLast) { this.beforeRange = beforeRange; + this.untilLast = untilLast; } @Override @@ -83,4 +94,8 @@ public class PreviousFill extends IFill { } return beforePair; } + + public boolean isUntilLast() { + return untilLast; + } }
