This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/ABDwithView
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7cee9c5e6929642010a49fdba41cd63d5bf16477
Author: Minghui Liu <[email protected]>
AuthorDate: Tue Jun 27 00:10:24 2023 +0800

    finish
---
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 16 +++++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 71 +++++++++++++++-------
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |  4 +-
 .../mpp/plan/analyze/ExpressionTypeAnalyzer.java   | 11 ++--
 .../visitor/GetMeasurementExpressionVisitor.java   | 22 ++++---
 ...catDeviceAndBindSchemaForExpressionVisitor.java | 30 ++++++++-
 ...ncatDeviceAndBindSchemaForPredicateVisitor.java | 30 ++++++++-
 .../plan/planner/distribution/SourceRewriter.java  | 13 ++--
 8 files changed, 150 insertions(+), 47 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index ec8e5361627..496bd1cf2b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -126,6 +126,9 @@ public class Analysis {
   // Query Analysis (used in ALIGN BY DEVICE)
   
/////////////////////////////////////////////////////////////////////////////////////////////////
 
+  // map from output device name to queried devices
+  private Map<String, List<String>> outputDeviceToQueriedDevicesMap;
+
   // map from device name to series/aggregation under this device
   private Map<String, Set<Expression>> deviceToSourceExpressions;
 
@@ -577,6 +580,10 @@ public class Analysis {
     this.expressionTypes.putAll(types);
   }
 
+  public void setExpressionType(Expression expression, TSDataType type) {
+    this.expressionTypes.put(NodeRef.of(expression), type);
+  }
+
   public Set<Expression> getDeviceViewOutputExpressions() {
     return deviceViewOutputExpressions;
   }
@@ -709,4 +716,13 @@ public class Analysis {
   public void setTimeseriesOrderingForLastQuery(Ordering 
timeseriesOrderingForLastQuery) {
     this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery;
   }
+
+  public Map<String, List<String>> getOutputDeviceToQueriedDevicesMap() {
+    return outputDeviceToQueriedDevicesMap;
+  }
+
+  public void setOutputDeviceToQueriedDevicesMap(
+      Map<String, List<String>> outputDeviceToQueriedDevicesMap) {
+    this.outputDeviceToQueriedDevicesMap = outputDeviceToQueriedDevicesMap;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 1aa437f1301..a980b0a4f52 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -263,6 +263,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         } else {
           schemaTree = schemaFetcher.fetchSchema(patternTree, context);
         }
+
         // If there is no leaf node in the schema tree, the query should be 
completed immediately
         if (schemaTree.isEmpty()) {
           return finishQuery(queryStatement, analysis);
@@ -270,14 +271,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
         // make sure paths in logical view is fetched
         updateSchemaTreeByViews(analysis, schemaTree);
-        if (analysis.useLogicalView()) {
-          if (queryStatement.isAlignByDevice()) {
-            throw new SemanticException("Views cannot be used in ALIGN BY 
DEVICE query yet.");
-          }
-          if (queryStatement.isGroupByTag()) {
-            throw new SemanticException("Views cannot be used in GROUP BY TAGS 
query yet.");
-          }
-        }
       } finally {
         logger.debug("[EndFetchSchema]");
         QueryPlanCostMetricSet.getInstance()
@@ -483,6 +476,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     } catch (Exception e) {
       throw new SemanticException(e);
     }
+    analysis.setUseLogicalView(useLogicalView);
+    if (useLogicalView && ((QueryStatement) 
analysis.getStatement()).isGroupByTag()) {
+      throw new SemanticException("Views cannot be used in GROUP BY TAGS query 
yet.");
+    }
+
     if (needToReFetch) {
       ISchemaTree viewSchemaTree = this.schemaFetcher.fetchSchema(patternTree, 
null);
       originSchemaTree.mergeSchemaTree(viewSchemaTree);
@@ -490,7 +488,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       allDatabases.addAll(originSchemaTree.getDatabases());
       originSchemaTree.setDatabases(allDatabases);
     }
-    analysis.setUseLogicalView(useLogicalView);
   }
 
   private Map<Integer, List<Pair<Expression, String>>> analyzeSelect(
@@ -599,10 +596,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         noMeasurementDevices.remove(device);
         for (Expression expression : selectExpressionsOfOneDevice) {
           Expression measurementExpression =
-              ExpressionAnalyzer.getMeasurementExpression(expression);
+              ExpressionAnalyzer.getMeasurementExpression(expression, 
analysis);
           measurementToDeviceSelectExpressions
               .computeIfAbsent(measurementExpression, key -> new 
LinkedHashMap<>())
-              .put(device.getFullPath(), 
ExpressionAnalyzer.removeAliasFromExpression(expression));
+              .put(device.getFullPath(), expression);
         }
       }
 
@@ -645,13 +642,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
               deviceToSelectExpressionsOfOneMeasurement.entrySet()) {
             String deviceName = deviceNameSelectExpressionEntry.getKey();
             Expression expression = deviceNameSelectExpressionEntry.getValue();
-
-            Expression expressionWithoutAlias =
-                ExpressionAnalyzer.removeAliasFromExpression(expression);
-            analyzeExpression(analysis, expressionWithoutAlias);
+            analyzeExpression(analysis, expression);
             deviceToSelectExpressions
                 .computeIfAbsent(deviceName, key -> new LinkedHashSet<>())
-                .add(expressionWithoutAlias);
+                .add(expression);
           }
           paginationController.consumeLimit();
         } else {
@@ -714,7 +708,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
       conJunctions.addAll(
           expressionsInHaving.stream()
-              .map(ExpressionAnalyzer::getMeasurementExpression)
+              .map(expression -> 
ExpressionAnalyzer.getMeasurementExpression(expression, analysis))
               .collect(Collectors.toList()));
 
       for (Expression expression : expressionsInHaving) {
@@ -892,7 +886,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     for (Pair<Expression, String> outputExpressionAndAlias : 
outputExpressions) {
       FunctionExpression rawExpression = (FunctionExpression) 
outputExpressionAndAlias.getLeft();
       FunctionExpression measurementExpression =
-          (FunctionExpression) 
ExpressionAnalyzer.getMeasurementExpression(rawExpression);
+          (FunctionExpression) 
ExpressionAnalyzer.getMeasurementExpression(rawExpression, analysis);
       outputExpressionToRawExpressionsMap
           .computeIfAbsent(measurementExpression, v -> new HashSet<>())
           .add(rawExpression);
@@ -1084,7 +1078,32 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             
.addAll(ExpressionAnalyzer.searchSourceExpressions(whereExpression));
       }
     }
+
+    Map<String, List<String>> outputDeviceToQueriedDevicesMap = new 
LinkedHashMap<>();
+    for (Map.Entry<String, Set<Expression>> deviceSourceExpressionsEntry :
+        deviceToSourceExpressions.entrySet()) {
+      String deviceName = deviceSourceExpressionsEntry.getKey();
+      Set<Expression> sourceExpressionsUnderDevice = 
deviceSourceExpressionsEntry.getValue();
+      Set<Expression> actualSourceExpressions = new HashSet<>();
+
+      Set<String> queriedDevices = new HashSet<>();
+      for (Expression expression : sourceExpressionsUnderDevice) {
+        
queriedDevices.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
+        
actualSourceExpressions.add(ExpressionAnalyzer.removeAliasFromExpression(expression));
+      }
+      if (queriedDevices.size() > 1) {
+        throw new SemanticException(
+            "Cross-device queries are not supported in ALIGN BY DEVICE 
queries.");
+      }
+      if (actualSourceExpressions.size() < 
sourceExpressionsUnderDevice.size()) {
+        throw new SemanticException(
+            "Views representing the same data source cannot be queried 
concurrently in ALIGN BY DEVICE queries.");
+      }
+      outputDeviceToQueriedDevicesMap.put(deviceName, new 
ArrayList<>(queriedDevices));
+    }
+
     analysis.setDeviceToSourceExpressions(deviceToSourceExpressions);
+    
analysis.setOutputDeviceToQueriedDevicesMap(outputDeviceToQueriedDevicesMap);
   }
 
   private void analyzeSource(Analysis analysis, QueryStatement queryStatement) 
{
@@ -1233,7 +1252,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       }
       for (Expression expression : outputExpressionsUnderDevice) {
         outputColumns.add(
-            
ExpressionAnalyzer.getMeasurementExpression(expression).getOutputSymbol());
+            ExpressionAnalyzer.getMeasurementExpression(expression, 
analysis).getOutputSymbol());
       }
       deviceToOutputColumnsMap.put(deviceName, outputColumns);
     }
@@ -1356,8 +1375,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   }
 
   private TSDataType analyzeExpression(Analysis analysis, Expression 
expression) {
-    ExpressionTypeAnalyzer.analyzeExpression(analysis, expression);
-    return analysis.getType(expression);
+    return ExpressionTypeAnalyzer.analyzeExpression(analysis, expression);
+  }
+
+  private void setExpressionType(Analysis analysis, Expression expression, 
TSDataType type) {
+    analysis.setExpressionType(expression, type);
   }
 
   private void analyzeDeviceToGroupBy(
@@ -1476,7 +1498,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         }
 
         Expression devicerViewExpression =
-            ExpressionAnalyzer.getMeasurementExpression(expressionForItem);
+            ExpressionAnalyzer.getMeasurementExpression(expressionForItem, 
analysis);
         analyzeExpression(analysis, devicerViewExpression);
 
         deviceViewOrderByExpression.add(devicerViewExpression);
@@ -1629,7 +1651,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis, QueryStatement queryStatement, ISchemaTree 
schemaTree) {
     Set<String> deviceSet = new HashSet<>();
     if (queryStatement.isAlignByDevice()) {
-      deviceSet = analysis.getDeviceToSourceExpressions().keySet();
+      deviceSet =
+          analysis.getOutputDeviceToQueriedDevicesMap().values().stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toSet());
     } else {
       for (Expression expression : analysis.getSourceExpressions()) {
         
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 19552a72b30..afcfc133ba9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -705,8 +705,8 @@ public class ExpressionAnalyzer {
     return ((TimeSeriesOperand) expression).getPath().getDevice();
   }
 
-  public static Expression getMeasurementExpression(Expression expression) {
-    return new GetMeasurementExpressionVisitor().process(expression, null);
+  public static Expression getMeasurementExpression(Expression expression, 
Analysis analysis) {
+    return new GetMeasurementExpressionVisitor().process(expression, analysis);
   }
 
   public static Expression evaluatePredicate(Expression predicate) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionTypeAnalyzer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionTypeAnalyzer.java
index 1f47297c458..d87c93c127a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionTypeAnalyzer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionTypeAnalyzer.java
@@ -58,11 +58,14 @@ public class ExpressionTypeAnalyzer {
 
   private ExpressionTypeAnalyzer() {}
 
-  public static void analyzeExpression(Analysis analysis, Expression 
expression) {
-    ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
-    analyzer.analyze(expression);
+  public static TSDataType analyzeExpression(Analysis analysis, Expression 
expression) {
+    if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) {
+      ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
+      analyzer.analyze(expression);
 
-    updateAnalysis(analysis, analyzer);
+      updateAnalysis(analysis, analyzer);
+    }
+    return analysis.getType(expression);
   }
 
   public static void analyzeExpression(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/GetMeasurementExpressionVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/GetMeasurementExpressionVisitor.java
index 60f346163d0..ade126ffe8f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/GetMeasurementExpressionVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/GetMeasurementExpressionVisitor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.plan.expression.visitor;
 
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
@@ -28,24 +30,26 @@ import 
org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import java.util.ArrayList;
 import java.util.List;
 
-public class GetMeasurementExpressionVisitor extends ReconstructVisitor<Void> {
+public class GetMeasurementExpressionVisitor extends 
ReconstructVisitor<Analysis> {
 
   @Override
-  public Expression process(Expression expression, Void context) {
-    Expression newExpression = expression.accept(this, context);
-
+  public Expression process(Expression expression, Analysis analysis) {
     if (expression.getViewPath() != null) {
       PartialPath viewPath = expression.getViewPath();
-      newExpression.setViewPath(new PartialPath(viewPath.getMeasurement(), 
false));
+      return new TimeSeriesOperand(
+          new MeasurementPath(
+              new PartialPath(viewPath.getMeasurement(), false),
+              ExpressionTypeAnalyzer.analyzeExpression(analysis, expression)));
     }
-    return newExpression;
+    return expression.accept(this, analysis);
   }
 
   @Override
-  public Expression visitFunctionExpression(FunctionExpression 
functionExpression, Void context) {
+  public Expression visitFunctionExpression(
+      FunctionExpression functionExpression, Analysis analysis) {
     List<Expression> childExpressions = new ArrayList<>();
     for (Expression suffixExpression : functionExpression.getExpressions()) {
-      childExpressions.add(process(suffixExpression, null));
+      childExpressions.add(process(suffixExpression, analysis));
     }
     return new FunctionExpression(
         functionExpression.getFunctionName(),
@@ -54,7 +58,7 @@ public class GetMeasurementExpressionVisitor extends 
ReconstructVisitor<Void> {
   }
 
   @Override
-  public Expression visitTimeSeriesOperand(TimeSeriesOperand 
timeSeriesOperand, Void context) {
+  public Expression visitTimeSeriesOperand(TimeSeriesOperand 
timeSeriesOperand, Analysis analysis) {
     MeasurementPath rawPath = (MeasurementPath) timeSeriesOperand.getPath();
     PartialPath measurement = new PartialPath(rawPath.getMeasurement(), false);
     MeasurementPath measurementWithSchema =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
index 716ae1a2aec..b46afd7bc4a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
@@ -21,7 +21,9 @@ package 
org.apache.iotdb.db.mpp.plan.expression.visitor.cartesian;
 
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
@@ -35,6 +37,7 @@ import java.util.List;
 import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.cartesianProduct;
 import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
 import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands;
+import static 
org.apache.iotdb.db.mpp.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath;
 import static 
org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForAggregationNonSeriesInputExpressions;
 
 public class ConcatDeviceAndBindSchemaForExpressionVisitor
@@ -77,8 +80,31 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor
     if (actualPaths.isEmpty()) {
       return Collections.emptyList();
     }
-    List<PartialPath> noStarPaths = new ArrayList<>(actualPaths);
-    return reconstructTimeSeriesOperands(timeSeriesOperand, noStarPaths);
+
+    // process logical view
+    List<MeasurementPath> nonViewActualPaths = new ArrayList<>();
+    List<MeasurementPath> viewPaths = new ArrayList<>();
+    for (MeasurementPath measurementPath : actualPaths) {
+      if (measurementPath.getMeasurementSchema().isLogicalView()) {
+        viewPaths.add(measurementPath);
+      } else {
+        nonViewActualPaths.add(measurementPath);
+      }
+    }
+    List<Expression> reconstructTimeSeriesOperands =
+        ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand, 
nonViewActualPaths);
+    // handle logical views
+    for (MeasurementPath measurementPath : viewPaths) {
+      Expression replacedExpression = transformViewPath(measurementPath, 
context.getSchemaTree());
+      if (!(replacedExpression instanceof TimeSeriesOperand)) {
+        throw new SemanticException(
+            "Only writable view timeseries are supported in ALIGN BY DEVICE 
queries.");
+      }
+
+      replacedExpression.setViewPath(measurementPath);
+      reconstructTimeSeriesOperands.add(replacedExpression);
+    }
+    return reconstructTimeSeriesOperands;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
index d03f5cb1b32..b471fad5530 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
@@ -37,6 +37,7 @@ import java.util.List;
 import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.cartesianProduct;
 import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
 import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands;
+import static 
org.apache.iotdb.db.mpp.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath;
 
 public class ConcatDeviceAndBindSchemaForPredicateVisitor
     extends 
CartesianProductVisitor<ConcatDeviceAndBindSchemaForPredicateVisitor.Context> {
@@ -58,12 +59,35 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor
   public List<Expression> visitTimeSeriesOperand(TimeSeriesOperand predicate, 
Context context) {
     PartialPath measurement = predicate.getPath();
     PartialPath concatPath = context.getDevicePath().concatPath(measurement);
-    List<MeasurementPath> noStarPaths =
+
+    List<MeasurementPath> nonViewPathList = new ArrayList<>();
+    List<MeasurementPath> viewPathList = new ArrayList<>();
+    List<MeasurementPath> actualPaths =
         context.getSchemaTree().searchMeasurementPaths(concatPath).left;
-    if (noStarPaths.isEmpty()) {
+    if (actualPaths.isEmpty()) {
       return Collections.singletonList(new NullOperand());
     }
-    return reconstructTimeSeriesOperands(predicate, noStarPaths);
+    for (MeasurementPath measurementPath : actualPaths) {
+      if (measurementPath.getMeasurementSchema().isLogicalView()) {
+        viewPathList.add(measurementPath);
+      } else {
+        nonViewPathList.add(measurementPath);
+      }
+    }
+
+    List<Expression> reconstructTimeSeriesOperands =
+        reconstructTimeSeriesOperands(predicate, nonViewPathList);
+    for (MeasurementPath measurementPath : viewPathList) {
+      Expression replacedExpression = transformViewPath(measurementPath, 
context.getSchemaTree());
+      if (!(replacedExpression instanceof TimeSeriesOperand)) {
+        throw new SemanticException(
+            "Only writable view timeseries are supported in ALIGN BY DEVICE 
queries.");
+      }
+
+      replacedExpression.setViewPath(measurementPath);
+      reconstructTimeSeriesOperands.add(replacedExpression);
+    }
+    return reconstructTimeSeriesOperands;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fea6109d195..b00d72d85a1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -163,12 +163,17 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
 
     List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
     // Step 1: constructs DeviceViewSplit
+    Map<String, List<String>> outputDeviceToQueriedDevicesMap =
+        analysis.getOutputDeviceToQueriedDevicesMap();
     for (int i = 0; i < node.getDevices().size(); i++) {
-      String device = node.getDevices().get(i);
+      String outputDevice = node.getDevices().get(i);
       PlanNode child = node.getChildren().get(i);
-      List<TRegionReplicaSet> regionReplicaSets =
-          analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter());
-      deviceViewSplits.add(new DeviceViewSplit(device, child, 
regionReplicaSets));
+      List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
+      for (String queriedDevice : 
outputDeviceToQueriedDevicesMap.get(outputDevice)) {
+        regionReplicaSets.addAll(
+            analysis.getPartitionInfo(queriedDevice, 
analysis.getGlobalTimeFilter()));
+      }
+      deviceViewSplits.add(new DeviceViewSplit(outputDevice, child, 
regionReplicaSets));
       relatedDataRegions.addAll(regionReplicaSets);
     }
 

Reply via email to