This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/LastPointFetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 381b8f0768bfc2cffe6fde2080c8b7a6955a79f0 Author: Minghui Liu <[email protected]> AuthorDate: Thu May 26 17:32:45 2022 +0800 analyze LastPointFetchStatement --- .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 71 +++++++++++++++------- .../statement/crud/LastPointFetchStatement.java | 9 +-- 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index dc168a55a7..26d3bcd95c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.mpp.plan.statement.crud.LastPointFetchStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.mpp.plan.statement.literal.Literal; import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement; @@ -167,27 +168,7 @@ public class Analyzer { throw new SemanticException("Only time filters are supported in LAST query"); } - List<MeasurementPath> allSelectedPath = schemaTree.getAllMeasurement(); - Set<Expression> sourceExpressions = - allSelectedPath.stream() - .map(TimeSeriesOperand::new) - .collect(Collectors.toCollection(LinkedHashSet::new)); - sourceExpressions.forEach( - expression -> ExpressionAnalyzer.updateTypeProvider(expression, typeProvider)); - analysis.setSourceExpressions(sourceExpressions); - - analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER); - typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, TSDataType.TEXT); - typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT); - typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE, TSDataType.TEXT); - - Set<String> deviceSet = - sourceExpressions.stream() - .map(ExpressionAnalyzer::getDeviceNameInSourceExpression) - .collect(Collectors.toSet()); - DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, schemaTree); - analysis.setDataPartitionInfo(dataPartition); - return analysis; + return analyzeLast(analysis, schemaTree.getAllMeasurement(), schemaTree); } // Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1 @@ -376,7 +357,7 @@ public class Analyzer { FilterNullParameter filterNullParameter = new FilterNullParameter(); filterNullParameter.setFilterNullPolicy( queryStatement.getFilterNullComponent().getWithoutPolicyType()); - List<Expression> resultFilterNullColumns = new ArrayList<>(); + List<Expression> resultFilterNullColumns; if (queryStatement.isAlignByDevice()) { resultFilterNullColumns = analyzeWithoutNullAlignByDevice( @@ -793,6 +774,38 @@ public class Analyzer { return new DatasetHeader(columnHeaders, isIgnoreTimestamp); } + private Analysis analyzeLast( + Analysis analysis, List<MeasurementPath> allSelectedPath, SchemaTree schemaTree) { + Set<Expression> sourceExpressions = + allSelectedPath.stream() + .map(TimeSeriesOperand::new) + .collect(Collectors.toCollection(LinkedHashSet::new)); + sourceExpressions.forEach( + expression -> ExpressionAnalyzer.updateTypeProvider(expression, typeProvider)); + analysis.setSourceExpressions(sourceExpressions); + + analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER); + typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, TSDataType.TEXT); + typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT); + typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE, TSDataType.TEXT); + + Set<String> deviceSet = + allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet()); + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); + for (String devicePath : deviceSet) { + DataPartitionQueryParam queryParam = new DataPartitionQueryParam(); + queryParam.setDevicePath(devicePath); + sgNameToQueryParamsMap + .computeIfAbsent( + schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>()) + .add(queryParam); + } + DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap); + analysis.setDataPartitionInfo(dataPartition); + + return analysis; + } + private DataPartition fetchDataPartitionByDevices( Set<String> deviceSet, SchemaTree schemaTree) { Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); @@ -845,6 +858,20 @@ public class Analyzer { } } + @Override + public Analysis visitLastPointFetch( + LastPointFetchStatement statement, MPPQueryContext context) { + context.setQueryType(QueryType.READ); + + Analysis analysis = new Analysis(); + analysis.setStatement(statement); + + SchemaTree schemaTree = new SchemaTree(); + schemaTree.setStorageGroups(schemaTree.getStorageGroups()); + + return analyzeLast(analysis, statement.getSelectedPaths(), schemaTree); + } + @Override public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java index 06265e5f8e..7a26b042c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java @@ -42,13 +42,8 @@ public class LastPointFetchStatement extends Statement { return selectedPaths; } - public String getBelongedStorageGroup(String pathName) { - for (String storageGroup : storageGroups) { - if (pathName.startsWith(storageGroup + ".")) { - return storageGroup; - } - } - throw new RuntimeException("No matched storage group. Please check the path " + pathName); + public List<String> getStorageGroups() { + return storageGroups; } @Override
