This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 790e57ff71 [IOTDB-3283] Implement Analyzer & LogicalPlanner for last
query (#6017)
790e57ff71 is described below
commit 790e57ff7171aea20066033caaf6cda6074c6c7d
Author: liuminghui233 <[email protected]>
AuthorDate: Thu May 26 11:44:59 2022 +0800
[IOTDB-3283] Implement Analyzer & LogicalPlanner for last query (#6017)
---
.../iotdb/db/mpp/common/header/HeaderConstant.java | 19 ++++++-
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 59 +++++++++++++++++-----
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 23 +++++++++
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 6 +++
.../plan/node/process/LastQueryMergeNode.java | 4 +-
.../plan/node/source/AlignedLastQueryScanNode.java | 4 +-
.../plan/node/source/LastQueryScanNode.java | 6 +--
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 43 ++++++++++++++++
8 files changed, 143 insertions(+), 21 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
index 875bb50428..e9951b13ca 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
@@ -27,6 +27,8 @@ import java.util.Collections;
public class HeaderConstant {
// column names for query statement
+ public static final String COLUMN_TIME = "Time";
+ public static final String COLUMN_VALUE = "value";
public static final String COLUMN_DEVICE = "Device";
// column names for schema statement
@@ -71,6 +73,9 @@ public class HeaderConstant {
public static final DatasetHeader countTimeSeriesHeader;
public static final DatasetHeader countLevelTimeSeriesHeader;
+ // dataset header for last query
+ public static final DatasetHeader LAST_QUERY_HEADER;
+
static {
countStorageGroupHeader =
new DatasetHeader(
@@ -140,9 +145,19 @@ public class HeaderConstant {
true);
showChildPathsHeader =
new DatasetHeader(
- Arrays.asList(new ColumnHeader(COLUMN_CHILDPATHS,
TSDataType.TEXT)), true);
+ Collections.singletonList(new ColumnHeader(COLUMN_CHILDPATHS,
TSDataType.TEXT)), true);
showChildNodesHeader =
new DatasetHeader(
- Arrays.asList(new ColumnHeader(COLUMN_CHILDNODES,
TSDataType.TEXT)), true);
+ Collections.singletonList(new ColumnHeader(COLUMN_CHILDNODES,
TSDataType.TEXT)), true);
+ }
+
+ static {
+ LAST_QUERY_HEADER =
+ new DatasetHeader(
+ Arrays.asList(
+ new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_VALUE, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_TIMESERIES_DATATYPE, TSDataType.TEXT)),
+ false);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index f1176643a9..dc168a55a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -37,6 +38,7 @@ import
org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FilterNullParameter;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -139,9 +141,9 @@ public class Analyzer {
// concat path and construct path pattern tree
PathPatternTree patternTree = new PathPatternTree();
- QueryStatement rewrittenStatement =
+ queryStatement =
(QueryStatement) new ConcatPathRewriter().rewrite(queryStatement,
patternTree);
- analysis.setStatement(rewrittenStatement);
+ analysis.setStatement(queryStatement);
// request schema fetch API
logger.info("{} fetch query schema...", getLogHeader());
@@ -160,6 +162,34 @@ public class Analyzer {
analysis.setGlobalTimeFilter(globalTimeFilter);
analysis.setHasValueFilter(hasValueFilter);
+ if (queryStatement.isLastQuery()) {
+ if (hasValueFilter) {
+ throw new SemanticException("Only time filters are supported in
LAST query");
+ }
+
+ List<MeasurementPath> allSelectedPath =
schemaTree.getAllMeasurement();
+ Set<Expression> sourceExpressions =
+ allSelectedPath.stream()
+ .map(TimeSeriesOperand::new)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ sourceExpressions.forEach(
+ expression -> ExpressionAnalyzer.updateTypeProvider(expression,
typeProvider));
+ analysis.setSourceExpressions(sourceExpressions);
+
+ analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER);
+ typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES,
TSDataType.TEXT);
+ typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT);
+ typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE,
TSDataType.TEXT);
+
+ Set<String> deviceSet =
+ sourceExpressions.stream()
+ .map(ExpressionAnalyzer::getDeviceNameInSourceExpression)
+ .collect(Collectors.toSet());
+ DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet,
schemaTree);
+ analysis.setDataPartitionInfo(dataPartition);
+ return analysis;
+ }
+
// Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
// outputExpressions: [<root.sg.d1.s1,null>, <root.sg.d1.s1 +
root.sg.d1.s2,t>,
// <udf(udf(root.sg.d1.s1)),null>]
@@ -390,16 +420,7 @@ public class Analyzer {
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
}
}
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap =
new HashMap<>();
- for (String devicePath : deviceSet) {
- DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
- queryParam.setDevicePath(devicePath);
- sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(devicePath), key -> new
ArrayList<>())
- .add(queryParam);
- }
- DataPartition dataPartition =
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet,
schemaTree);
analysis.setDataPartitionInfo(dataPartition);
} catch (StatementAnalyzeException e) {
logger.error("Meet error when analyzing the query statement: ", e);
@@ -772,6 +793,20 @@ public class Analyzer {
return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
}
+ private DataPartition fetchDataPartitionByDevices(
+ Set<String> deviceSet, SchemaTree schemaTree) {
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
+ for (String devicePath : deviceSet) {
+ DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
+ queryParam.setDevicePath(devicePath);
+ sgNameToQueryParamsMap
+ .computeIfAbsent(
+ schemaTree.getBelongedStorageGroup(devicePath), key -> new
ArrayList<>())
+ .add(queryParam);
+ }
+ return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ }
+
/**
* Check datatype consistency in ALIGN BY DEVICE.
*
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 68b0a39237..d60d210118 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -54,12 +54,15 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
@@ -137,6 +140,26 @@ public class LogicalPlanBuilder {
return this;
}
+ public LogicalPlanBuilder planLast(Set<Expression> sourceExpressions, Filter
globalTimeFilter) {
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ for (Expression sourceExpression : sourceExpressions) {
+ MeasurementPath selectPath =
+ (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
+ if (selectPath.isUnderAlignedEntity()) {
+ sourceNodeList.add(
+ new AlignedLastQueryScanNode(
+ context.getQueryId().genPlanNodeId(), new
AlignedPath(selectPath)));
+ } else {
+ sourceNodeList.add(new
LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
+ }
+ }
+
+ this.root =
+ new LastQueryMergeNode(
+ context.getQueryId().genPlanNodeId(), sourceNodeList,
globalTimeFilter);
+ return this;
+ }
+
public LogicalPlanBuilder planAggregationSource(
Set<Expression> sourceExpressions,
OrderBy scanOrder,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index e3f332b877..0e6da8c705 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -118,6 +118,12 @@ public class LogicalPlanner {
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext
context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+ if (queryStatement.isLastQuery()) {
+ return planBuilder
+ .planLast(analysis.getSourceExpressions(),
analysis.getGlobalTimeFilter())
+ .getRoot();
+ }
+
if (queryStatement.isAlignByDevice()) {
Map<String, PlanNode> deviceToSubPlanMap = new HashMap<>();
for (String deviceName :
analysis.getDeviceToSourceExpressions().keySet()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index 5923bc8942..144ea7dc76 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -33,7 +33,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import static
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_COLUMN_HEADERS;
+import static
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
public class LastQueryMergeNode extends ProcessNode {
@@ -76,7 +76,7 @@ public class LastQueryMergeNode extends ProcessNode {
@Override
public List<String> getOutputColumnNames() {
- return LAST_QUERY_COLUMN_HEADERS;
+ return LAST_QUERY_HEADER_COLUMNS;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index dced79aa0a..66c8a54f34 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -32,7 +32,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
-import static
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_COLUMN_HEADERS;
+import static
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
public class AlignedLastQueryScanNode extends SourceNode {
// The path of the target series which will be scanned.
@@ -91,7 +91,7 @@ public class AlignedLastQueryScanNode extends SourceNode {
@Override
public List<String> getOutputColumnNames() {
- return LAST_QUERY_COLUMN_HEADERS;
+ return LAST_QUERY_HEADER_COLUMNS;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index ab20f9548c..be97c5187d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -34,8 +34,8 @@ import java.util.Objects;
public class LastQueryScanNode extends SourceNode {
- public static final List<String> LAST_QUERY_COLUMN_HEADERS =
- ImmutableList.of("Time", "timeseries", "value", "dataType");
+ public static final List<String> LAST_QUERY_HEADER_COLUMNS =
+ ImmutableList.of("timeseries", "value", "dataType");
// The path of the target series which will be scanned.
private final MeasurementPath seriesPath;
@@ -97,7 +97,7 @@ public class LastQueryScanNode extends SourceNode {
@Override
public List<String> getOutputColumnNames() {
- return LAST_QUERY_COLUMN_HEADERS;
+ return LAST_QUERY_HEADER_COLUMNS;
}
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 2cb8ddbb63..e58bf4af47 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -37,11 +37,14 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -103,6 +106,46 @@ public class QueryLogicalPlanUtil {
}
}
+ /* Last Query */
+ static {
+ String sql = "SELECT last * FROM root.sg.** WHERE time > 100";
+
+ QueryId queryId = new QueryId("test");
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ sourceNodeList.add(
+ new LastQueryScanNode(
+ queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s3")));
+ sourceNodeList.add(
+ new LastQueryScanNode(
+ queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s1")));
+ sourceNodeList.add(
+ new LastQueryScanNode(
+ queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s2")));
+ sourceNodeList.add(
+ new LastQueryScanNode(
+ queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d2.s4")));
+ sourceNodeList.add(
+ new AlignedLastQueryScanNode(
+ queryId.genPlanNodeId(),
+ new AlignedPath((MeasurementPath)
schemaMap.get("root.sg.d2.a.s1"))));
+ sourceNodeList.add(
+ new AlignedLastQueryScanNode(
+ queryId.genPlanNodeId(),
+ new AlignedPath((MeasurementPath)
schemaMap.get("root.sg.d2.a.s2"))));
+ sourceNodeList.add(
+ new LastQueryScanNode(
+ queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d2.s1")));
+ sourceNodeList.add(
+ new LastQueryScanNode(
+ queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d2.s2")));
+
+ LastQueryMergeNode lastQueryMergeNode =
+ new LastQueryMergeNode(queryId.genPlanNodeId(), sourceNodeList,
TimeFilter.gt(100));
+
+ querySQLs.add(sql);
+ sqlToPlanMap.put(sql, lastQueryMergeNode);
+ }
+
/* Simple Query */
static {
String sql = "SELECT ** FROM root.sg.d2 LIMIT 10 OFFSET 10";