This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 98f397c4c49 Optimize the logic of get ReplicaSets for devices
98f397c4c49 is described below

commit 98f397c4c49668d4370baefbcfa336f3180a9741
Author: Beyyes <[email protected]>
AuthorDate: Sat Mar 15 21:36:28 2025 +0800

    Optimize the logic of get ReplicaSets for devices
---
 .../distribute/TableDistributedPlanGenerator.java  | 104 +++++++++++++++++----
 1 file changed, 87 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 90391cf8f4e..f18d9b398fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -22,8 +22,12 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
@@ -81,6 +85,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Pair;
 
 import javax.annotation.Nonnull;
@@ -534,12 +539,7 @@ public class TableDistributedPlanGenerator
       for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
         boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
         Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode> 
pair =
-            tableScanNodeMap.get(regionReplicaSet);
-
-        if (pair == null) {
-          pair = new Pair<>(null, null);
-          tableScanNodeMap.put(regionReplicaSet, pair);
-        }
+            tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new 
Pair<>(null, null));
 
         if (pair.left == null && aligned) {
           TreeAlignedDeviceViewScanNode scanNode =
@@ -719,20 +719,35 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitAggregationTableScan(
       AggregationTableScanNode node, PlanContext context) {
+    String dbName =
+        node instanceof AggregationTreeDeviceViewScanNode
+            ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
+            : node.getQualifiedObjectName().getDatabaseName();
+    DataPartition dataPartition = analysis.getDataPartition();
     boolean needSplit = false;
     List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
-    for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
-      List<TRegionReplicaSet> regionReplicaSets =
-          analysis.getDataRegionReplicaSetWithTimeFilter(
-              node instanceof AggregationTreeDeviceViewScanNode
-                  ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
-                  : node.getQualifiedObjectName().getDatabaseName(),
-              deviceEntry.getDeviceID(),
-              node.getTimeFilter());
-      if (regionReplicaSets.size() > 1) {
-        needSplit = true;
+    if (dataPartition == null) {
+      // do nothing
+    } else if (!dataPartition.getDataPartitionMap().containsKey(dbName)) {
+      throw new SemanticException(
+          String.format("Given queried database: %s is not exist!", dbName));
+    } else {
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap =
+          dataPartition.getDataPartitionMap().get(dbName);
+      Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new 
HashMap<>();
+      for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
+        List<TRegionReplicaSet> regionReplicaSets =
+            getReplicaSetWithTimeFilter(
+                dataPartition,
+                seriesSlotMap,
+                deviceEntry.getDeviceID(),
+                node.getTimeFilter(),
+                cachedSeriesSlotWithRegions);
+        if (regionReplicaSets.size() > 1) {
+          needSplit = true;
+        }
+        regionReplicaSetsList.add(regionReplicaSets);
       }
-      regionReplicaSetsList.add(regionReplicaSets);
     }
 
     if (regionReplicaSetsList.isEmpty()) {
@@ -799,6 +814,61 @@ public class TableDistributedPlanGenerator
     return resultTableScanNodeList;
   }
 
+  private List<TRegionReplicaSet> getReplicaSetWithTimeFilter(
+      DataPartition dataPartition,
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap,
+      IDeviceID deviceId,
+      Filter timeFilter,
+      Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions) {
+
+    // given seriesPartitionSlot has already been calculated
+    final TSeriesPartitionSlot seriesPartitionSlot = 
dataPartition.calculateDeviceGroupId(deviceId);
+    if 
(cachedSeriesSlotWithRegions.containsKey(seriesPartitionSlot.getSlotId())) {
+      return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+    }
+
+    if (!seriesSlotMap.containsKey(seriesPartitionSlot)) {
+      cachedSeriesSlotWithRegions.put(
+          seriesPartitionSlot.getSlotId(), 
Collections.singletonList(NOT_ASSIGNED));
+      return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+    }
+
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> timeSlotMap =
+        seriesSlotMap.get(seriesPartitionSlot);
+    if (timeSlotMap.size() == 1) {
+      TTimePartitionSlot timePartitionSlot = 
timeSlotMap.keySet().iterator().next();
+      if (timeFilter == null
+          || TimePartitionUtils.satisfyPartitionStartTime(
+              timeFilter, timePartitionSlot.startTime)) {
+        cachedSeriesSlotWithRegions.put(
+            seriesPartitionSlot.getSlotId(), 
timeSlotMap.values().iterator().next());
+        return timeSlotMap.values().iterator().next();
+      } else {
+        cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(), 
Collections.emptyList());
+        return Collections.emptyList();
+      }
+    }
+
+    Set<TRegionReplicaSet> resultSet = new HashSet<>();
+    for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry : 
timeSlotMap.entrySet()) {
+      TTimePartitionSlot timePartitionSlot = entry.getKey();
+      if (TimePartitionUtils.satisfyPartitionStartTime(timeFilter, 
timePartitionSlot.startTime)) {
+        resultSet.addAll(entry.getValue());
+      }
+    }
+    List<TRegionReplicaSet> resultList = new ArrayList<>(resultSet);
+    cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(), 
resultList);
+    return resultList;
+    //    return seriesSlotMap.get(seriesPartitionSlot).entrySet().stream()
+    //        .filter(
+    //            entry ->
+    //                TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
+    // entry.getKey().startTime))
+    //        .flatMap(entry -> entry.getValue().stream())
+    //        .distinct()
+    //        .collect(toList());
+  }
+
   @Override
   public List<PlanNode> visitEnforceSingleRow(EnforceSingleRowNode node, 
PlanContext context) {
     return dealWithPlainSingleChildNode(node, context);

Reply via email to