This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f78561434d Implement LastPointFetchStatement (#6036)
f78561434d is described below
commit f78561434d4d860cf6a80ecbd8abc8a970303286
Author: liuminghui233 <[email protected]>
AuthorDate: Thu May 26 19:35:38 2022 +0800
Implement LastPointFetchStatement (#6036)
---
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 73 +++++++++++++++-------
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 2 +-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 9 ++-
.../db/mpp/plan/statement/StatementVisitor.java | 7 ++-
.../LastPointFetchStatement.java} | 40 +++++-------
.../SchemaFetchStatement.java | 2 +-
6 files changed, 83 insertions(+), 50 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index dc168a55a7..2e2bca5fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -56,6 +56,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.LastPointFetchStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
@@ -67,7 +69,6 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStat
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -167,27 +168,7 @@ public class Analyzer {
throw new SemanticException("Only time filters are supported in
LAST query");
}
- List<MeasurementPath> allSelectedPath =
schemaTree.getAllMeasurement();
- Set<Expression> sourceExpressions =
- allSelectedPath.stream()
- .map(TimeSeriesOperand::new)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- sourceExpressions.forEach(
- expression -> ExpressionAnalyzer.updateTypeProvider(expression,
typeProvider));
- analysis.setSourceExpressions(sourceExpressions);
-
- analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER);
- typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES,
TSDataType.TEXT);
- typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT);
- typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE,
TSDataType.TEXT);
-
- Set<String> deviceSet =
- sourceExpressions.stream()
- .map(ExpressionAnalyzer::getDeviceNameInSourceExpression)
- .collect(Collectors.toSet());
- DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet,
schemaTree);
- analysis.setDataPartitionInfo(dataPartition);
- return analysis;
+ return analyzeLast(analysis, schemaTree.getAllMeasurement(),
schemaTree);
}
// Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
@@ -376,7 +357,7 @@ public class Analyzer {
FilterNullParameter filterNullParameter = new FilterNullParameter();
filterNullParameter.setFilterNullPolicy(
queryStatement.getFilterNullComponent().getWithoutPolicyType());
- List<Expression> resultFilterNullColumns = new ArrayList<>();
+ List<Expression> resultFilterNullColumns;
if (queryStatement.isAlignByDevice()) {
resultFilterNullColumns =
analyzeWithoutNullAlignByDevice(
@@ -793,6 +774,38 @@ public class Analyzer {
return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
}
+ private Analysis analyzeLast(
+ Analysis analysis, List<MeasurementPath> allSelectedPath, SchemaTree
schemaTree) {
+ Set<Expression> sourceExpressions =
+ allSelectedPath.stream()
+ .map(TimeSeriesOperand::new)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ sourceExpressions.forEach(
+ expression -> ExpressionAnalyzer.updateTypeProvider(expression,
typeProvider));
+ analysis.setSourceExpressions(sourceExpressions);
+
+ analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER);
+ typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, TSDataType.TEXT);
+ typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT);
+ typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE,
TSDataType.TEXT);
+
+ Set<String> deviceSet =
+
allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet());
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
+ for (String devicePath : deviceSet) {
+ DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
+ queryParam.setDevicePath(devicePath);
+ sgNameToQueryParamsMap
+ .computeIfAbsent(
+ schemaTree.getBelongedStorageGroup(devicePath), key -> new
ArrayList<>())
+ .add(queryParam);
+ }
+ DataPartition dataPartition =
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ analysis.setDataPartitionInfo(dataPartition);
+
+ return analysis;
+ }
+
private DataPartition fetchDataPartitionByDevices(
Set<String> deviceSet, SchemaTree schemaTree) {
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
@@ -845,6 +858,20 @@ public class Analyzer {
}
}
+ @Override
+ public Analysis visitLastPointFetch(
+ LastPointFetchStatement statement, MPPQueryContext context) {
+ context.setQueryType(QueryType.READ);
+
+ Analysis analysis = new Analysis();
+ analysis.setStatement(statement);
+
+ SchemaTree schemaTree = new SchemaTree();
+ schemaTree.setStorageGroups(schemaTree.getStorageGroups());
+
+ return analyzeLast(analysis, statement.getSelectedPaths(), schemaTree);
+ }
+
@Override
public Analysis visitInsert(InsertStatement insertStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 49b57c79b5..e92d9b0cbe 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -32,9 +32,9 @@ import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 4950a6427f..ad1bccb1ee 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -46,6 +46,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.LastPointFetchStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
@@ -55,7 +57,6 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStat
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -281,6 +282,12 @@ public class LogicalPlanner {
return planBuilder.getRoot();
}
+ public PlanNode visitLastPointFetch(
+ LastPointFetchStatement lastPointFetchStatement, MPPQueryContext
context) {
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+ return planBuilder.planLast(analysis.getSourceExpressions(),
null).getRoot();
+ }
+
@Override
public PlanNode visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext
context) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 650d15d160..e279ba9cb5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.LastPointFetchStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
@@ -38,7 +40,6 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceS
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
@@ -139,6 +140,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(queryStatement, context);
}
+ public R visitLastPointFetch(LastPointFetchStatement
lastPointFetchStatement, C context) {
+ return visitStatement(lastPointFetchStatement, context);
+ }
+
// Insert Statement
public R visitInsert(InsertStatement insertStatement, C context) {
return visitStatement(insertStatement, context);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SchemaFetchStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/LastPointFetchStatement.java
similarity index 55%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SchemaFetchStatement.java
copy to
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/LastPointFetchStatement.java
index f8c1bbbe70..58347a68e4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SchemaFetchStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/LastPointFetchStatement.java
@@ -17,48 +17,42 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.statement.metadata;
+package org.apache.iotdb.db.mpp.plan.statement.internal;
-import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import java.util.List;
-public class SchemaFetchStatement extends Statement {
+public class LastPointFetchStatement extends Statement {
- private PathPatternTree patternTree;
+ private final List<MeasurementPath> selectedPaths;
- private SchemaPartition schemaPartition;
+ // used for fetch data partition
+ private final List<String> storageGroups;
- public SchemaFetchStatement(PathPatternTree patternTree) {
- super();
- this.patternTree = patternTree;
- setType(StatementType.FETCH_SCHEMA);
+ public LastPointFetchStatement(List<MeasurementPath> selectedPaths,
List<String> storageGroups) {
+ this.selectedPaths = selectedPaths;
+ this.storageGroups = storageGroups;
}
- public PathPatternTree getPatternTree() {
- return patternTree;
+ public List<MeasurementPath> getSelectedPaths() {
+ return selectedPaths;
}
- public SchemaPartition getSchemaPartition() {
- return schemaPartition;
- }
-
- public void setSchemaPartition(SchemaPartition schemaPartition) {
- this.schemaPartition = schemaPartition;
+ public List<String> getStorageGroups() {
+ return storageGroups;
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
- return visitor.visitSchemaFetch(this, context);
+ public List<? extends PartialPath> getPaths() {
+ return selectedPaths;
}
@Override
- public List<PartialPath> getPaths() {
- return patternTree.splitToPathList();
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitLastPointFetch(this, context);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SchemaFetchStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SchemaFetchStatement.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
index f8c1bbbe70..9ab133be35 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SchemaFetchStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.statement.metadata;
+package org.apache.iotdb.db.mpp.plan.statement.internal;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;