This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_mergesort
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/agg_mergesort by this 
push:
     new c8e5e9e1285 add AggMergeSortNode
c8e5e9e1285 is described below

commit c8e5e9e1285dde2a3cec5ad784bfd4652e37b023
Author: Beyyes <[email protected]>
AuthorDate: Tue Feb 6 00:05:45 2024 +0800

    add AggMergeSortNode
---
 .../operator/process/AggMergeSortOperator.java     |  17 ++--
 .../operator/process/DeviceViewOperator.java       |   3 +
 .../plan/planner/LogicalPlanBuilder.java           |  83 +++++++++++------
 .../planner/distribution/ExchangeNodeAdder.java    |  14 ++-
 .../plan/node/process/AggMergeSortNode.java        | 102 ++++++++++++++-------
 5 files changed, 146 insertions(+), 73 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
index 8616079e069..14348f90512 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
@@ -21,7 +21,7 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.process;
 
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -42,8 +41,8 @@ import java.util.List;
  * it to the form we need, adding the device column and allocating value 
column to its expected
  * location, then get the next device operator until no next device.
  *
- * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that 
have not transformed
- * the result form.
+ * <p>The deviceOperators can be aggregationSeriesScanOperator, 
imeJoinOperator or
+ * seriesScanOperator that have not transformed the result form.
  *
  * <p>Attention! If some columns are not existing in one device, those columns 
will be null. e.g.
  * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column 
of s2 will be filled
@@ -119,6 +118,10 @@ public class AggMergeSortOperator implements 
ProcessOperator {
       return null;
     }
 
+    boolean deviceView =
+        getCurDeviceOperator() instanceof DeviceViewOperator
+            || getCurDeviceOperator() instanceof ExchangeOperator;
+
     TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
     if (tsBlock == null) {
       return null;
@@ -128,11 +131,11 @@ public class AggMergeSortOperator implements 
ProcessOperator {
     // fill existing columns
     Column[] newValueColumns = new Column[dataTypes.size()];
     for (int i = 0; i < indexes.size(); i++) {
-      newValueColumns[indexes.get(i)] = tsBlock.getColumn(i);
+      newValueColumns[indexes.get(i)] = tsBlock.getColumn(deviceView ? i + 1 : 
i);
     }
     // construct device column
     ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
-    deviceColumnBuilder.writeBinary(new Binary(getCurDeviceName(), 
TSFileConfig.STRING_CHARSET));
+    deviceColumnBuilder.writeBinary(tsBlock.getColumn(0).getBinary(0));
     newValueColumns[0] =
         new RunLengthEncodedColumn(deviceColumnBuilder.build(), 
tsBlock.getPositionCount());
     // construct other null columns
@@ -146,7 +149,7 @@ public class AggMergeSortOperator implements 
ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    return deviceIndex < devices.size();
+    return deviceIndex < deviceOperators.size();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
index 0dd6329ac40..122b271096b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
@@ -84,6 +84,9 @@ public class DeviceViewOperator implements ProcessOperator {
   }
 
   private Operator getCurDeviceOperator() {
+    if (deviceIndex >= deviceOperators.size()) {
+      System.out.printf("aaa");
+    }
     return deviceOperators.get(deviceIndex);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 4871df6af9d..aee63275d50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -806,14 +806,14 @@ public class LogicalPlanBuilder {
             valueFilterLimit);
       } else {
         // has order by expression, use TopKNode + DeviceViewNode
+
         topKNode.addChild(
             addDeviceViewNode(
                 orderByParameter,
                 outputColumnNames,
                 deviceToMeasurementIndexesMap,
                 deviceNameToSourceNodesMap,
-                valueFilterLimit,
-                queryStatement.isAggregationQuery()));
+                valueFilterLimit));
       }
 
       analysis.setUseTopKNode();
@@ -832,14 +832,23 @@ public class LogicalPlanBuilder {
       this.root = mergeSortNode;
     } else {
       // order by based on device, use DeviceViewNode
-      this.root =
-          addDeviceViewNode(
-              orderByParameter,
-              outputColumnNames,
-              deviceToMeasurementIndexesMap,
-              deviceNameToSourceNodesMap,
-              -1,
-              queryStatement.isAggregationQuery());
+      if (queryStatement.isAggregationQuery()) {
+        this.root =
+            addAggMergeSortNode(
+                orderByParameter,
+                outputColumnNames,
+                deviceToMeasurementIndexesMap,
+                deviceNameToSourceNodesMap,
+                -1);
+      } else {
+        this.root =
+            addDeviceViewNode(
+                orderByParameter,
+                outputColumnNames,
+                deviceToMeasurementIndexesMap,
+                deviceNameToSourceNodesMap,
+                -1);
+      }
     }
 
     context.getTypeProvider().setType(DEVICE, TSDataType.TEXT);
@@ -914,29 +923,18 @@ public class LogicalPlanBuilder {
     }
   }
 
-  private DeviceViewNode addDeviceViewNode(
+  private MultiChildProcessNode addDeviceViewNode(
       OrderByParameter orderByParameter,
       List<String> outputColumnNames,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap,
       Map<String, PlanNode> deviceNameToSourceNodesMap,
-      long valueFilterLimit,
-      boolean isAggregation) {
-    DeviceViewNode deviceViewNode;
-    if (isAggregation) {
-      deviceViewNode =
-          new AggMergeSortNode(
-              context.getQueryId().genPlanNodeId(),
-              orderByParameter,
-              outputColumnNames,
-              deviceToMeasurementIndexesMap);
-    } else {
-      deviceViewNode =
-          new DeviceViewNode(
-              context.getQueryId().genPlanNodeId(),
-              orderByParameter,
-              outputColumnNames,
-              deviceToMeasurementIndexesMap);
-    }
+      long valueFilterLimit) {
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(
+            context.getQueryId().genPlanNodeId(),
+            orderByParameter,
+            outputColumnNames,
+            deviceToMeasurementIndexesMap);
 
     for (Map.Entry<String, PlanNode> entry : 
deviceNameToSourceNodesMap.entrySet()) {
       String deviceName = entry.getKey();
@@ -952,6 +950,33 @@ public class LogicalPlanBuilder {
     return deviceViewNode;
   }
 
+  private MultiChildProcessNode addAggMergeSortNode(
+      OrderByParameter orderByParameter,
+      List<String> outputColumnNames,
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap,
+      Map<String, PlanNode> deviceNameToSourceNodesMap,
+      long valueFilterLimit) {
+    AggMergeSortNode aggMergeSortNode =
+        new AggMergeSortNode(
+            context.getQueryId().genPlanNodeId(),
+            orderByParameter,
+            outputColumnNames,
+            deviceToMeasurementIndexesMap);
+
+    for (Map.Entry<String, PlanNode> entry : 
deviceNameToSourceNodesMap.entrySet()) {
+      String deviceName = entry.getKey();
+      PlanNode subPlan = entry.getValue();
+      if (valueFilterLimit > 0) {
+        LimitNode limitNode =
+            new LimitNode(context.getQueryId().genPlanNodeId(), subPlan, 
valueFilterLimit);
+        aggMergeSortNode.addChildDeviceNode(deviceName, limitNode);
+      } else {
+        aggMergeSortNode.addChildDeviceNode(deviceName, subPlan);
+      }
+    }
+    return aggMergeSortNode;
+  }
+
   public LogicalPlanBuilder planGroupByLevel(
       Map<Expression, Set<Expression>> groupByLevelExpressions,
       GroupByTimeParameter groupByTimeParameter,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 8e6fed5a02a..03d02d58e8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -516,8 +516,8 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     Map<TRegionReplicaSet, DeviceViewNode> regionTopKNodeMap = new HashMap<>();
     for (PlanNode child : visitedChildren) {
       TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
-      regionTopKNodeMap
-          .computeIfAbsent(
+      DeviceViewNode deviceViewNode =
+          regionTopKNodeMap.computeIfAbsent(
               region,
               k -> {
                 DeviceViewNode childDeviceViewNode =
@@ -525,13 +525,17 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
                         context.queryContext.getQueryId().genPlanNodeId(),
                         aggMergeSortNode.getMergeOrderParameter(),
                         aggMergeSortNode.getOutputColumnNames(),
-                        aggMergeSortNode.getDeviceToMeasurementIndexesMap());
+                        new HashMap<>());
                 context.putNodeDistribution(
                     childDeviceViewNode.getPlanNodeId(),
                     new 
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
                 return childDeviceViewNode;
-              })
-          .addChild(child);
+              });
+      String device = ((SeriesAggregationScanNode) 
child).getSeriesPath().getDevice();
+      deviceViewNode
+          .getDeviceToMeasurementIndexesMap()
+          .put(device, 
aggMergeSortNode.getDeviceToMeasurementIndexesMap().get(device));
+      deviceViewNode.addChildDeviceNode(device, child);
     }
 
     for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry : 
regionTopKNodeMap.entrySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
index ad1f2797db7..7ead4883973 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
@@ -35,14 +35,31 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class AggMergeSortNode extends DeviceViewNode {
+public class AggMergeSortNode extends MultiChildProcessNode {
+
+  // The result output order, which could sort by device and time.
+  // The size of this list is 2 and the first SortItem in this list has higher 
priority.
+  protected final OrderByParameter mergeOrderParameter;
+
+  // The size devices and children should be the same.
+  protected final List<String> devices = new ArrayList<>();
+
+  // Device column and measurement columns in result output
+  protected final List<String> outputColumnNames;
+
+  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
+  // not 0 because device is the first column
+  final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
 
   public AggMergeSortNode(
       PlanNodeId id,
       OrderByParameter mergeOrderParameter,
       List<String> outputColumnNames,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
-    super(id, mergeOrderParameter, outputColumnNames, 
deviceToMeasurementIndexesMap);
+    super(id);
+    this.mergeOrderParameter = mergeOrderParameter;
+    this.outputColumnNames = outputColumnNames;
+    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
   }
 
   public AggMergeSortNode(
@@ -51,7 +68,11 @@ public class AggMergeSortNode extends DeviceViewNode {
       List<String> outputColumnNames,
       List<String> devices,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
-    super(id, mergeOrderParameter, outputColumnNames, devices, 
deviceToMeasurementIndexesMap);
+    super(id);
+    this.mergeOrderParameter = mergeOrderParameter;
+    this.outputColumnNames = outputColumnNames;
+    this.devices.addAll(devices);
+    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
   }
 
   @Override
@@ -74,6 +95,39 @@ public class AggMergeSortNode extends DeviceViewNode {
     return outputColumnNames;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    AggMergeSortNode that = (AggMergeSortNode) o;
+    return mergeOrderParameter.equals(that.mergeOrderParameter)
+        && devices.equals(that.devices)
+        && outputColumnNames.equals(that.outputColumnNames)
+        && 
deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(),
+        mergeOrderParameter,
+        devices,
+        outputColumnNames,
+        deviceToMeasurementIndexesMap);
+  }
+
+  @Override
+  public String toString() {
+    return "AggMergeSort-" + this.getPlanNodeId();
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
@@ -118,7 +172,7 @@ public class AggMergeSortNode extends DeviceViewNode {
     }
   }
 
-  public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
+  public static AggMergeSortNode deserialize(ByteBuffer byteBuffer) {
     OrderByParameter mergeOrderParameter = 
OrderByParameter.deserialize(byteBuffer);
     int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
     List<String> outputColumnNames = new ArrayList<>();
@@ -146,40 +200,24 @@ public class AggMergeSortNode extends DeviceViewNode {
       mapSize--;
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new DeviceViewNode(
+    return new AggMergeSortNode(
         planNodeId, mergeOrderParameter, outputColumnNames, devices, 
deviceToMeasurementIndexesMap);
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    DeviceViewNode that = (DeviceViewNode) o;
-    return mergeOrderParameter.equals(that.mergeOrderParameter)
-        && devices.equals(that.devices)
-        && outputColumnNames.equals(that.outputColumnNames)
-        && 
deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
+  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
+    this.devices.add(deviceName);
+    this.children.add(childNode);
   }
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(
-        super.hashCode(),
-        mergeOrderParameter,
-        devices,
-        outputColumnNames,
-        deviceToMeasurementIndexesMap);
+  public List<String> getDevices() {
+    return devices;
   }
 
-  @Override
-  public String toString() {
-    return "AggMergeSort-" + this.getPlanNodeId();
+  public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+    return deviceToMeasurementIndexesMap;
+  }
+
+  public OrderByParameter getMergeOrderParameter() {
+    return mergeOrderParameter;
   }
 }

Reply via email to