This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_groupbytime
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/agg_groupbytime by 
this push:
     new 231e09e018 add groupbytime parameter and scanorder when distribution 
planning
231e09e018 is described below

commit 231e09e018d007b3cde4bc74a2b36d292418aae1
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri May 27 12:28:42 2022 +0800

    add groupbytime parameter and scanorder when distribution planning
---
 .../plan/planner/distribution/SourceRewriter.java  | 13 +++++++--
 .../planner/plan/node/process/AggregationNode.java |  5 ----
 .../source/AlignedSeriesAggregationScanNode.java   | 12 ---------
 .../node/source/SeriesAggregationScanNode.java     | 12 ---------
 .../node/source/SeriesAggregationSourceNode.java   | 31 ++++++++++++++++++++++
 5 files changed, 42 insertions(+), 31 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 65d4d6552f..72c1819c80 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -321,7 +321,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
 
     AggregationNode aggregationNode =
         new AggregationNode(
-            context.queryContext.getQueryId().genPlanNodeId(), 
rootAggDescriptorList);
+            context.queryContext.getQueryId().genPlanNodeId(),
+            rootAggDescriptorList,
+            node.getGroupByTimeParameter(),
+            node.getScanOrder());
     for (TRegionReplicaSet dataRegion : dataDistribution) {
       SeriesAggregationScanNode split = (SeriesAggregationScanNode) 
node.clone();
       split.setAggregationDescriptorList(leafAggDescriptorList);
@@ -470,9 +473,15 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                         descriptor.getInputExpressions()));
               });
     }
+    checkArgument(
+        sources.size() > 0, "Aggregation sources should not be empty when 
distribution planning");
+    SeriesAggregationSourceNode seed = sources.get(0);
     AggregationNode aggregationNode =
         new AggregationNode(
-            context.queryContext.getQueryId().genPlanNodeId(), 
rootAggDescriptorList);
+            context.queryContext.getQueryId().genPlanNodeId(),
+            rootAggDescriptorList,
+            seed.getGroupByTimeParameter(),
+            seed.getScanOrder());
 
     final boolean[] addParent = {false};
     sourceGroup.forEach(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index bfaf165b36..f2286dc21c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -76,11 +76,6 @@ public class AggregationNode extends MultiChildNode {
     this.children = children;
   }
 
-  @Deprecated
-  public AggregationNode(PlanNodeId id, List<AggregationDescriptor> 
aggregationDescriptorList) {
-    this(id, aggregationDescriptorList, null, OrderBy.TIMESTAMP_ASC);
-  }
-
   public List<AggregationDescriptor> getAggregationDescriptorList() {
     return aggregationDescriptorList;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 8afe5104b6..050597073b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -50,18 +50,6 @@ public class AlignedSeriesAggregationScanNode extends 
SeriesAggregationSourceNod
   // The paths of the target series which will be aggregated.
   private final AlignedPath alignedPath;
 
-  // The order to traverse the data.
-  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-  private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
-
-  // time filter for current series, could be null if doesn't exist
-  @Nullable private Filter timeFilter;
-
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  @Nullable private GroupByTimeParameter groupByTimeParameter;
-
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index 6a77e02ae0..77b0eff6d8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -64,18 +64,6 @@ public class SeriesAggregationScanNode extends 
SeriesAggregationSourceNode {
   // The path of the target series which will be aggregated.
   private final MeasurementPath seriesPath;
 
-  // The order to traverse the data.
-  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-  private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
-
-  // time filter for current series, could be null if doesn't exist
-  @Nullable private Filter timeFilter;
-
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  @Nullable private GroupByTimeParameter groupByTimeParameter;
-
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
index f9e1200ea6..d0087b2303 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
@@ -21,6 +21,11 @@ package 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import javax.annotation.Nullable;
 
 import java.util.List;
 
@@ -30,6 +35,18 @@ public abstract class SeriesAggregationSourceNode extends 
SeriesSourceNode {
   // result TsBlock
   protected List<AggregationDescriptor> aggregationDescriptorList;
 
+  // The order to traverse the data.
+  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+  protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
+
+  // time filter for current series, could be null if doesn't exist
+  @Nullable protected Filter timeFilter;
+
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
   public SeriesAggregationSourceNode(
       PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
     super(id);
@@ -43,4 +60,18 @@ public abstract class SeriesAggregationSourceNode extends 
SeriesSourceNode {
   public void setAggregationDescriptorList(List<AggregationDescriptor> 
aggregationDescriptorList) {
     this.aggregationDescriptorList = aggregationDescriptorList;
   }
+
+  public OrderBy getScanOrder() {
+    return scanOrder;
+  }
+
+  @Nullable
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
+
+  @Nullable
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
 }

Reply via email to