This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/LastPointFetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 381b8f0768bfc2cffe6fde2080c8b7a6955a79f0
Author: Minghui Liu <[email protected]>
AuthorDate: Thu May 26 17:32:45 2022 +0800

    analyze LastPointFetchStatement
---
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 71 +++++++++++++++-------
 .../statement/crud/LastPointFetchStatement.java    |  9 +--
 2 files changed, 51 insertions(+), 29 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index dc168a55a7..26d3bcd95c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -55,6 +55,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastPointFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -167,27 +168,7 @@ public class Analyzer {
             throw new SemanticException("Only time filters are supported in 
LAST query");
           }
 
-          List<MeasurementPath> allSelectedPath = 
schemaTree.getAllMeasurement();
-          Set<Expression> sourceExpressions =
-              allSelectedPath.stream()
-                  .map(TimeSeriesOperand::new)
-                  .collect(Collectors.toCollection(LinkedHashSet::new));
-          sourceExpressions.forEach(
-              expression -> ExpressionAnalyzer.updateTypeProvider(expression, 
typeProvider));
-          analysis.setSourceExpressions(sourceExpressions);
-
-          analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER);
-          typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, 
TSDataType.TEXT);
-          typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT);
-          typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE, 
TSDataType.TEXT);
-
-          Set<String> deviceSet =
-              sourceExpressions.stream()
-                  .map(ExpressionAnalyzer::getDeviceNameInSourceExpression)
-                  .collect(Collectors.toSet());
-          DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, 
schemaTree);
-          analysis.setDataPartitionInfo(dataPartition);
-          return analysis;
+          return analyzeLast(analysis, schemaTree.getAllMeasurement(), 
schemaTree);
         }
 
         // Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
@@ -376,7 +357,7 @@ public class Analyzer {
           FilterNullParameter filterNullParameter = new FilterNullParameter();
           filterNullParameter.setFilterNullPolicy(
               queryStatement.getFilterNullComponent().getWithoutPolicyType());
-          List<Expression> resultFilterNullColumns = new ArrayList<>();
+          List<Expression> resultFilterNullColumns;
           if (queryStatement.isAlignByDevice()) {
             resultFilterNullColumns =
                 analyzeWithoutNullAlignByDevice(
@@ -793,6 +774,38 @@ public class Analyzer {
       return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
     }
 
+    private Analysis analyzeLast(
+        Analysis analysis, List<MeasurementPath> allSelectedPath, SchemaTree 
schemaTree) {
+      Set<Expression> sourceExpressions =
+          allSelectedPath.stream()
+              .map(TimeSeriesOperand::new)
+              .collect(Collectors.toCollection(LinkedHashSet::new));
+      sourceExpressions.forEach(
+          expression -> ExpressionAnalyzer.updateTypeProvider(expression, 
typeProvider));
+      analysis.setSourceExpressions(sourceExpressions);
+
+      analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER);
+      typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, TSDataType.TEXT);
+      typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT);
+      typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE, 
TSDataType.TEXT);
+
+      Set<String> deviceSet =
+          
allSelectedPath.stream().map(MeasurementPath::getDevice).collect(Collectors.toSet());
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
+      for (String devicePath : deviceSet) {
+        DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
+        queryParam.setDevicePath(devicePath);
+        sgNameToQueryParamsMap
+            .computeIfAbsent(
+                schemaTree.getBelongedStorageGroup(devicePath), key -> new 
ArrayList<>())
+            .add(queryParam);
+      }
+      DataPartition dataPartition = 
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+      analysis.setDataPartitionInfo(dataPartition);
+
+      return analysis;
+    }
+
     private DataPartition fetchDataPartitionByDevices(
         Set<String> deviceSet, SchemaTree schemaTree) {
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
@@ -845,6 +858,20 @@ public class Analyzer {
       }
     }
 
+    @Override
+    public Analysis visitLastPointFetch(
+        LastPointFetchStatement statement, MPPQueryContext context) {
+      context.setQueryType(QueryType.READ);
+
+      Analysis analysis = new Analysis();
+      analysis.setStatement(statement);
+
+      SchemaTree schemaTree = new SchemaTree();
+      schemaTree.setStorageGroups(schemaTree.getStorageGroups());
+
+      return analyzeLast(analysis, statement.getSelectedPaths(), schemaTree);
+    }
+
     @Override
     public Analysis visitInsert(InsertStatement insertStatement, 
MPPQueryContext context) {
       context.setQueryType(QueryType.WRITE);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java
index 06265e5f8e..7a26b042c5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastPointFetchStatement.java
@@ -42,13 +42,8 @@ public class LastPointFetchStatement extends Statement {
     return selectedPaths;
   }
 
-  public String getBelongedStorageGroup(String pathName) {
-    for (String storageGroup : storageGroups) {
-      if (pathName.startsWith(storageGroup + ".")) {
-        return storageGroup;
-      }
-    }
-    throw new RuntimeException("No matched storage group. Please check the 
path " + pathName);
+  public List<String> getStorageGroups() {
+    return storageGroups;
   }
 
   @Override

Reply via email to