This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 51e6d17b821 [IOTDB-6120] Push down limit/offset in query with group by 
time
51e6d17b821 is described below

commit 51e6d17b82191e6661a91ee1696138480560e532
Author: YangCaiyin <[email protected]>
AuthorDate: Tue Aug 22 15:48:06 2023 +0800

    [IOTDB-6120] Push down limit/offset in query with group by time
---
 .../db/queryengine/plan/analyze/Analysis.java      |  11 +
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  76 +++---
 .../plan/optimization/LimitOffsetPushDown.java     | 117 +++++++++
 .../db/queryengine/plan/parser/ASTVisitor.java     |  15 +-
 .../plan/planner/LogicalPlanVisitor.java           |  12 +-
 .../plan/statement/component/OrderByComponent.java |   8 +-
 .../plan/statement/crud/QueryStatement.java        |  20 ++
 .../plan/optimization/LimitOffsetPushDownTest.java | 271 +++++++++++++++++++++
 8 files changed, 483 insertions(+), 47 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 1b900c0e425..0f9c4d223c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -127,6 +127,9 @@ public class Analysis {
   // Query Analysis (used in ALIGN BY DEVICE)
   
/////////////////////////////////////////////////////////////////////////////////////////////////
 
+  // the list of device names
+  private List<PartialPath> deviceList;
+
   // map from output device name to queried devices
   private Map<String, List<String>> outputDeviceToQueriedDevicesMap;
 
@@ -744,4 +747,12 @@ public class Analysis {
   public void setLastLevelUseWildcard(boolean lastLevelUseWildcard) {
     this.lastLevelUseWildcard = lastLevelUseWildcard;
   }
+
+  public void setDeviceList(List<PartialPath> deviceList) {
+    this.deviceList = deviceList;
+  }
+
+  public List<PartialPath> getDeviceList() {
+    return deviceList;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 7887a5d71a9..92d2e85c1b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -156,6 +156,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -182,6 +183,8 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHE
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.constructTargetDevice;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.constructTargetMeasurement;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.constructTargetPath;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
 import static 
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
 
 /** This visitor is used to analyze each type of Statement and returns the 
{@link Analysis}. */
@@ -248,17 +251,23 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
       List<Pair<Expression, String>> outputExpressions;
       if (queryStatement.isAlignByDevice()) {
-        Set<PartialPath> deviceSet = analyzeFrom(queryStatement, schemaTree);
+        List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
 
-        analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceSet);
-        outputExpressions = analyzeSelect(analysis, queryStatement, 
schemaTree, deviceSet);
-        if (deviceSet.isEmpty()) {
+        if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
+          // remove the device which won't appear in resultSet after 
limit/offset
+          deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, 
queryStatement);
+        }
+
+        analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
+        outputExpressions = analyzeSelect(analysis, queryStatement, 
schemaTree, deviceList);
+        if (deviceList.isEmpty()) {
           return finishQuery(queryStatement, analysis);
         }
+        analysis.setDeviceList(deviceList);
 
-        analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree, 
deviceSet);
-        analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, 
deviceSet);
-        analyzeHaving(analysis, queryStatement, schemaTree, deviceSet);
+        analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree, 
deviceList);
+        analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, 
deviceList);
+        analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
 
         analyzeDeviceToAggregation(analysis, queryStatement);
         analyzeDeviceToSourceTransform(analysis, queryStatement);
@@ -267,7 +276,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         analyzeDeviceViewOutput(analysis, queryStatement);
         analyzeDeviceViewInput(analysis, queryStatement);
 
-        analyzeInto(analysis, queryStatement, deviceSet, outputExpressions);
+        analyzeInto(analysis, queryStatement, deviceList, outputExpressions);
       } else {
         Map<Integer, List<Pair<Expression, String>>> outputExpressionMap =
             analyzeSelect(analysis, queryStatement, schemaTree);
@@ -389,15 +398,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         whereCondition.setPredicate(predicate);
       }
     }
-    if (queryStatement.isGroupByTime()) {
-      GroupByTimeComponent groupByTimeComponent = 
queryStatement.getGroupByTimeComponent();
-      Filter groupByFilter = initGroupByFilter(groupByTimeComponent);
-      if (globalTimeFilter == null) {
-        globalTimeFilter = groupByFilter;
-      } else {
-        globalTimeFilter = FilterFactory.and(globalTimeFilter, groupByFilter);
-      }
-    }
     analysis.setGlobalTimeFilter(globalTimeFilter);
     analysis.setHasValueFilter(hasValueFilter);
   }
@@ -536,11 +536,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return outputExpressionMap;
   }
 
-  private Set<PartialPath> analyzeFrom(QueryStatement queryStatement, 
ISchemaTree schemaTree) {
+  private List<PartialPath> analyzeFrom(QueryStatement queryStatement, 
ISchemaTree schemaTree) {
     // device path patterns in FROM clause
     List<PartialPath> devicePatternList = 
queryStatement.getFromComponent().getPrefixPaths();
 
-    Set<PartialPath> deviceSet = new LinkedHashSet<>();
+    Set<PartialPath> deviceSet = new HashSet<>();
     for (PartialPath devicePattern : devicePatternList) {
       // get all matched devices
       deviceSet.addAll(
@@ -548,21 +548,23 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
               .map(DeviceSchemaInfo::getDevicePath)
               .collect(Collectors.toList()));
     }
-    return deviceSet;
+
+    return queryStatement.getResultDeviceOrder() == Ordering.ASC
+        ? deviceSet.stream().sorted().collect(Collectors.toList())
+        : 
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
   }
 
   private List<Pair<Expression, String>> analyzeSelect(
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      Set<PartialPath> deviceSet) {
+      List<PartialPath> deviceSet) {
     List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
     Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
 
     ColumnPaginationController paginationController =
         new ColumnPaginationController(
             queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(), 
false);
-    Set<PartialPath> noMeasurementDevices = new HashSet<>(deviceSet);
 
     for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
       Expression selectExpression = resultColumn.getExpression();
@@ -578,7 +580,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         if (selectExpressionsOfOneDevice.isEmpty()) {
           continue;
         }
-        noMeasurementDevices.remove(device);
         updateMeasurementToDeviceSelectExpressions(
             analysis, measurementToDeviceSelectExpressions, device, 
selectExpressionsOfOneDevice);
       }
@@ -626,6 +627,12 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
 
     // remove devices without measurements to compute
+    Set<PartialPath> noMeasurementDevices = new HashSet<>();
+    for (PartialPath device : deviceSet) {
+      if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
+        noMeasurementDevices.add(device);
+      }
+    }
     deviceSet.removeAll(noMeasurementDevices);
 
     // when the select expression of any device is empty,
@@ -727,7 +734,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      Set<PartialPath> deviceSet) {
+      List<PartialPath> deviceSet) {
     if (!queryStatement.hasHaving()) {
       return;
     }
@@ -1191,7 +1198,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      Set<PartialPath> deviceSet) {
+      List<PartialPath> deviceSet) {
     if (!queryStatement.hasWhere()) {
       return;
     }
@@ -1448,7 +1455,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      Set<PartialPath> deviceSet) {
+      List<PartialPath> deviceSet) {
     if (queryStatement.getGroupByComponent() == null) {
       return;
     }
@@ -1521,7 +1528,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      Set<PartialPath> deviceSet) {
+      List<PartialPath> deviceSet) {
     if (!queryStatement.hasOrderByExpression()) {
       return;
     }
@@ -1681,6 +1688,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       return;
     }
 
+    if (queryStatement.isResultSetEmpty()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+    }
+
     GroupByTimeComponent groupByTimeComponent = 
queryStatement.getGroupByTimeComponent();
     if ((groupByTimeComponent.isIntervalByMonth() || 
groupByTimeComponent.isSlidingStepByMonth())
         && queryStatement.getResultTimeOrder() == Ordering.DESC) {
@@ -1692,6 +1703,15 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           "The query time range should be specified in the GROUP BY TIME 
clause.");
     }
     analysis.setGroupByTimeParameter(new 
GroupByTimeParameter(groupByTimeComponent));
+
+    Filter globalTimeFilter = analysis.getGlobalTimeFilter();
+    Filter groupByFilter = initGroupByFilter(groupByTimeComponent);
+    if (globalTimeFilter == null) {
+      globalTimeFilter = groupByFilter;
+    } else {
+      globalTimeFilter = FilterFactory.and(globalTimeFilter, groupByFilter);
+    }
+    analysis.setGlobalTimeFilter(globalTimeFilter);
   }
 
   private void analyzeFill(Analysis analysis, QueryStatement queryStatement) {
@@ -1845,7 +1865,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   private void analyzeInto(
       Analysis analysis,
       QueryStatement queryStatement,
-      Set<PartialPath> deviceSet,
+      List<PartialPath> deviceSet,
       List<Pair<Expression, String>> outputExpressions) {
     if (!queryStatement.isSelectInto()) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
index 52c9b8514fe..37dd6ff17a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.optimization;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
@@ -37,8 +38,13 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeri
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
+import 
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * <b>Optimization phase:</b> Distributed plan planning
  *
@@ -239,4 +245,115 @@ public class LimitOffsetPushDown implements PlanOptimizer 
{
       return analysis;
     }
   }
+
+  // following methods are used to push down limit/offset in group by time
+
+  // 1. push down limit/offset to group by time in align by time
+
+  public static boolean canPushDownLimitOffsetToGroupByTime(QueryStatement 
queryStatement) {
+    if (queryStatement.isGroupByTime()
+        && !queryStatement.isAlignByDevice()
+        && !queryStatement.hasHaving()
+        && !queryStatement.hasFill()) {
+      return !queryStatement.hasOrderBy() || 
queryStatement.isOrderByBasedOnTime();
+    }
+    return false;
+  }
+
+  public static void pushDownLimitOffsetToTimeParameter(QueryStatement 
queryStatement) {
+    GroupByTimeComponent groupByTimeComponent = 
queryStatement.getGroupByTimeComponent();
+    long startTime = groupByTimeComponent.getStartTime();
+    long endTime = groupByTimeComponent.getEndTime();
+    long step = groupByTimeComponent.getSlidingStep();
+    long interval = groupByTimeComponent.getInterval();
+
+    long size = (endTime - startTime + step - 1) / step;
+    if (size > queryStatement.getRowOffset()) {
+      long limitSize = queryStatement.getRowLimit();
+      long offsetSize = queryStatement.getRowOffset();
+      if (queryStatement.getResultTimeOrder() == Ordering.ASC) {
+        startTime = startTime + offsetSize * step;
+      } else {
+        startTime = startTime + (size - offsetSize - limitSize) * step;
+      }
+      endTime =
+          limitSize == 0
+              ? endTime
+              : Math.min(endTime, startTime + (limitSize - 1) * step + 
interval);
+      groupByTimeComponent.setEndTime(endTime);
+      groupByTimeComponent.setStartTime(startTime);
+    } else {
+      // finish the query, resultSet is empty
+      queryStatement.setResultSetEmpty(true);
+    }
+    queryStatement.setRowLimit(0);
+    queryStatement.setRowOffset(0);
+  }
+
+  // 2. push down limit/offset to group by time in align by device
+  public static boolean canPushDownLimitOffsetInGroupByTimeForDevice(
+      QueryStatement queryStatement) {
+    if (!hasLimitOffset(queryStatement)) {
+      return false;
+    }
+
+    if (queryStatement.isGroupByTime()
+        && queryStatement.isAlignByDevice()
+        && !queryStatement.hasHaving()
+        && !queryStatement.hasFill()) {
+      return !queryStatement.hasOrderBy() || 
queryStatement.isOrderByBasedOnDevice();
+    }
+    return false;
+  }
+
+  public static List<PartialPath> pushDownLimitOffsetInGroupByTimeForDevice(
+      List<PartialPath> deviceNames, QueryStatement queryStatement) {
+    GroupByTimeComponent groupByTimeComponent = 
queryStatement.getGroupByTimeComponent();
+    long startTime = groupByTimeComponent.getStartTime();
+    long endTime = groupByTimeComponent.getEndTime();
+
+    long size =
+        (endTime - startTime + groupByTimeComponent.getSlidingStep() - 1)
+            / groupByTimeComponent.getSlidingStep();
+    if (size == 0 || size * deviceNames.size() <= 
queryStatement.getRowOffset()) {
+      // resultSet is empty
+      queryStatement.setResultSetEmpty(true);
+      return deviceNames;
+    }
+
+    long limitSize = queryStatement.getRowLimit();
+    long offsetSize = queryStatement.getRowOffset();
+    List<PartialPath> optimizedDeviceNames = new ArrayList<>();
+    int startDeviceIndex = (int) (offsetSize / size);
+    int endDeviceIndex =
+        limitSize == 0
+            ? deviceNames.size() - 1
+            : (int)
+                ((limitSize - ((startDeviceIndex + 1) * size - offsetSize) + 
size - 1) / size
+                    + startDeviceIndex);
+
+    int index = 0;
+    while (index < startDeviceIndex) {
+      index++;
+    }
+    queryStatement.setRowOffset(offsetSize - startDeviceIndex * size);
+
+    // if only refer to one device, optimize the time parameter
+    if (startDeviceIndex == endDeviceIndex) {
+      optimizedDeviceNames.add(deviceNames.get(startDeviceIndex));
+      if (hasLimitOffset(queryStatement) && 
queryStatement.isOrderByTimeInDevices()) {
+        pushDownLimitOffsetToTimeParameter(queryStatement);
+      }
+    } else {
+      while (index <= endDeviceIndex && index < deviceNames.size()) {
+        optimizedDeviceNames.add(deviceNames.get(index));
+        index++;
+      }
+    }
+    return optimizedDeviceNames;
+  }
+
+  private static boolean hasLimitOffset(QueryStatement queryStatement) {
+    return queryStatement.hasLimit() || queryStatement.hasOffset();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 7d74eeed2de..72ab30672b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -226,6 +226,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_RESULT_NODES;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetToGroupByTime;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetToTimeParameter;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.CAST_FUNCTION;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.CAST_TYPE;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.REPLACE_FROM;
@@ -1360,6 +1362,11 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       queryStatement.setFillComponent(parseFillClause(ctx.fillClause()));
     }
 
+    // parse ALIGN BY
+    if (ctx.alignByClause() != null) {
+      queryStatement.setResultSetFormat(parseAlignBy(ctx.alignByClause()));
+    }
+
     if (ctx.paginationClause() != null) {
       // parse SLIMIT & SOFFSET
       if (ctx.paginationClause().seriesPaginationClause() != null) {
@@ -1383,14 +1390,12 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
           queryStatement.setRowOffset(
               
parseOffsetClause(ctx.paginationClause().rowPaginationClause().offsetClause()));
         }
+        if (canPushDownLimitOffsetToGroupByTime(queryStatement)) {
+          pushDownLimitOffsetToTimeParameter(queryStatement);
+        }
       }
     }
 
-    // parse ALIGN BY
-    if (ctx.alignByClause() != null) {
-      queryStatement.setResultSetFormat(parseAlignBy(ctx.alignByClause()));
-    }
-
     queryStatement.setUseWildcard(useWildcard);
     queryStatement.setLastLevelUseWildcard(lastLevelUseWildcard);
     return queryStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 372afb78488..9f28ec21026 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -51,7 +51,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnriched
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
@@ -89,12 +88,11 @@ import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * This visitor is used to generate a logical plan for the statement and 
returns the {@link
@@ -135,11 +133,9 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     }
 
     if (queryStatement.isAlignByDevice()) {
-      Map<String, PlanNode> deviceToSubPlanMap =
-          queryStatement.getResultDeviceOrder() == Ordering.ASC
-              ? new TreeMap<>()
-              : new TreeMap<>(Collections.reverseOrder());
-      for (String deviceName : 
analysis.getDeviceToSourceExpressions().keySet()) {
+      Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+      for (PartialPath device : analysis.getDeviceList()) {
+        String deviceName = device.getFullPath();
         LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis, 
context);
         subPlanBuilder =
             subPlanBuilder.withNewRoot(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
index 7419d20e5ab..8774db378f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
@@ -124,12 +124,8 @@ public class OrderByComponent extends StatementNode {
     return sortItemList.get(deviceOrderPriority).getOrdering();
   }
 
-  public boolean isDeviceOrderInitialized() {
-    return deviceOrderPriority != -1;
-  }
-
-  public boolean isTimeOrderInitialized() {
-    return timeOrderPriority != -1;
+  public int getTimeOrderPriority() {
+    return timeOrderPriority;
   }
 
   public String toSQLString() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
index 8179eda9fbd..ae363342256 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
@@ -118,6 +118,11 @@ public class QueryStatement extends Statement {
   // can use statistics to skip
   private boolean lastLevelUseWildcard = false;
 
+  // used in limit/offset push down optimizer, if the result set is empty 
after pushing down in
+  // ASTVisitor,
+  // we can skip the query
+  private boolean isResultSetEmpty = false;
+
   public QueryStatement() {
     this.statementType = StatementType.QUERY;
   }
@@ -195,6 +200,14 @@ public class QueryStatement extends Statement {
     this.rowOffset = rowOffset;
   }
 
+  public boolean isResultSetEmpty() {
+    return isResultSetEmpty;
+  }
+
+  public void setResultSetEmpty(boolean resultSetEmpty) {
+    isResultSetEmpty = resultSetEmpty;
+  }
+
   public long getSeriesLimit() {
     return seriesLimit;
   }
@@ -341,6 +354,13 @@ public class QueryStatement extends Statement {
     return orderByComponent != null && orderByComponent.isOrderByTime();
   }
 
+  public boolean isOrderByTimeInDevices() {
+    return orderByComponent == null
+        || (orderByComponent.isBasedOnDevice()
+            && (orderByComponent.getSortItemList().size() == 1
+                || orderByComponent.getTimeOrderPriority() == 1));
+  }
+
   public boolean isOrderByTimeseries() {
     return orderByComponent != null && orderByComponent.isOrderByTimeseries();
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
index afbef764c10..ff2b70a124b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
@@ -33,8 +33,11 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
+import 
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.Assert;
@@ -296,4 +299,272 @@ public class LimitOffsetPushDownTest {
     Assert.assertEquals(
         actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis, 
context));
   }
+
+  // test for limit/offset push down in group by time
+  @Test
+  public void testGroupByTimePushDown() {
+    String sql = "select avg(s1),sum(s2) from root.** group by ((1, 899], 
200ms) offset 1 limit 2";
+    checkGroupByTimePushDown(sql, 201, 601, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown2() {
+    String sql = "select avg(s1),sum(s2) from root.** group by ([4, 899), 
200ms) offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 404, 899, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown3() {
+    String sql = "select avg(s1),sum(s2) from root.** group by ([4, 899), 
88ms) offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 180, 444, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown4() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 88ms) order 
by time offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 180, 444, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown5() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 88ms) order 
by time desc offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 532, 796, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown6() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 100ms) order 
by time desc offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 404, 704, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown7() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms) order 
by time desc offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 654, 804, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown8() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([0, 900), 100ms) order 
by time desc offset 2 limit 2";
+    checkGroupByTimePushDown(sql, 500, 700, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown9() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms) order 
by s1 offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 4, 899, 3, 2);
+  }
+
+  @Test
+  public void testGroupByTimePushDown10() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms, 25ms) 
offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 54, 154, 0, 0);
+  }
+
+  @Test
+  public void testGroupByTimePushDown11() {
+    String sql =
+        "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms, 75ms) 
offset 2 limit 3";
+    checkGroupByTimePushDown(sql, 154, 354, 0, 0);
+  }
+
+  private void checkGroupByTimePushDown(
+      String sql, long startTime, long endTime, long rowLimit, long rowOffset) 
{
+    QueryStatement queryStatement =
+        (QueryStatement) StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
+    Assert.assertEquals(rowLimit, queryStatement.getRowLimit());
+    Assert.assertEquals(rowOffset, queryStatement.getRowOffset());
+
+    GroupByTimeComponent groupByTimeComponent = 
queryStatement.getGroupByTimeComponent();
+    Assert.assertEquals(startTime, groupByTimeComponent.getStartTime());
+    Assert.assertEquals(endTime, groupByTimeComponent.getEndTime());
+  }
+
+  // device: [root.sg.s1, root.sg.s2, root.sg.s2.a]
+  private void checkGroupByTimePushDownInAlignByDevice(
+      String sql,
+      List<String> deviceSet,
+      long rowLimit,
+      long rowOffset,
+      long startTime,
+      long endTime) {
+    QueryStatement statement =
+        (QueryStatement) StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
+    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+    Analyzer analyzer =
+        new Analyzer(context, new FakePartitionFetcherImpl(), new 
FakeSchemaFetcherImpl());
+    Analysis analysis = analyzer.analyze(statement);
+
+    Assert.assertEquals(rowLimit, statement.getRowLimit());
+    Assert.assertEquals(rowOffset, statement.getRowOffset());
+
+    int index = 0;
+    List<PartialPath> deviceSetInAnalysis = analysis.getDeviceList();
+    for (PartialPath path : deviceSetInAnalysis) {
+      Assert.assertEquals(path.getFullPath(), deviceSet.get(index));
+      index++;
+    }
+
+    GroupByTimeParameter groupByTimeParameter = 
analysis.getGroupByTimeParameter();
+    Assert.assertEquals(startTime, groupByTimeParameter.getStartTime());
+    Assert.assertEquals(endTime, groupByTimeParameter.getEndTime());
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 899), 50ms) offset 16 limit 
2 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d1");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 804, 899);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice2() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 899), 50ms) offset 16 limit 
10 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d1");
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 10, 16, 4, 899);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice3() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 899), 50ms) offset 20 limit 
2 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 104, 204);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice4() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 899), 50ms) offset 33 limit 
5 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    deviceSet.add("root.sg.d2.a");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 5, 15, 4, 899);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice5() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) offset 9 
limit 5 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 29, 179);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice6() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) offset 9 
limit 9 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    deviceSet.add("root.sg.d2.a");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice7() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) offset 9 
limit 9 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    deviceSet.add("root.sg.d2.a");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice8() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device offset 9 limit 9 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    deviceSet.add("root.sg.d2.a");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice9() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device desc offset 9 limit 9 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    deviceSet.add("root.sg.d1");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice10() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device, time desc offset 9 limit 5 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 54, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice11() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device desc, time desc offset 9 limit 5 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 54, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice12() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 899), 50ms) order by device 
desc offset 16 limit 2 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2.a");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 804, 899);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice13() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device, avg(s1) desc offset 9 limit 5 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 5, 1, 4, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice14() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device, avg(s1) desc,time desc offset 9 limit 5 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 5, 1, 4, 199);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice15() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device desc limit 1 offset 8 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 4, 54);
+  }
+
+  @Test
+  public void testGroupByTimePushDownInAlignByDevice16() {
+    String sql =
+        "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by 
device desc offset 8 align by device";
+    List<String> deviceSet = new ArrayList<>();
+    deviceSet.add("root.sg.d2");
+    deviceSet.add("root.sg.d1");
+    checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 4, 199);
+  }
 }


Reply via email to