This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d63c07a6ea87fd9e072c805f5b7b9e72ee543744
Author: Beyyes <[email protected]>
AuthorDate: Sat Mar 15 15:05:02 2025 +0800

    optimize the logic of get ReplicaSets of devices
---
 .../distribute/TableDistributedPlanGenerator.java  | 92 +++++++++++++++++++---
 1 file changed, 80 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 90391cf8f4e..d117279451f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -22,8 +22,12 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
@@ -81,6 +85,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Pair;
 
 import javax.annotation.Nonnull;
@@ -534,12 +539,7 @@ public class TableDistributedPlanGenerator
       for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
         boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
         Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode> 
pair =
-            tableScanNodeMap.get(regionReplicaSet);
-
-        if (pair == null) {
-          pair = new Pair<>(null, null);
-          tableScanNodeMap.put(regionReplicaSet, pair);
-        }
+            tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new 
Pair<>(null, null));
 
         if (pair.left == null && aligned) {
           TreeAlignedDeviceViewScanNode scanNode =
@@ -719,16 +719,29 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitAggregationTableScan(
       AggregationTableScanNode node, PlanContext context) {
-    boolean needSplit = false;
+    String dbName =
+        node instanceof AggregationTreeDeviceViewScanNode
+            ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
+            : node.getQualifiedObjectName().getDatabaseName();
+    DataPartition dataPartition = analysis.getDataPartition();
+    if (dataPartition == null || 
!dataPartition.getDataPartitionMap().containsKey(dbName)) {
+      throw new SemanticException(
+          String.format("Given queried database: %s is not exist!", dbName));
+    }
+
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap =
+        dataPartition.getDataPartitionMap().get(dbName);
+    Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new 
HashMap<>();
     List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
+    boolean needSplit = false;
     for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
       List<TRegionReplicaSet> regionReplicaSets =
-          analysis.getDataRegionReplicaSetWithTimeFilter(
-              node instanceof AggregationTreeDeviceViewScanNode
-                  ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
-                  : node.getQualifiedObjectName().getDatabaseName(),
+          getReplicaSetWithTimeFilter(
+              dataPartition,
+              seriesSlotMap,
               deviceEntry.getDeviceID(),
-              node.getTimeFilter());
+              node.getTimeFilter(),
+              cachedSeriesSlotWithRegions);
       if (regionReplicaSets.size() > 1) {
         needSplit = true;
       }
@@ -799,6 +812,61 @@ public class TableDistributedPlanGenerator
     return resultTableScanNodeList;
   }
 
+  private List<TRegionReplicaSet> getReplicaSetWithTimeFilter(
+      DataPartition dataPartition,
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap,
+      IDeviceID deviceId,
+      Filter timeFilter,
+      Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions) {
+
+    // given seriesPartitionSlot has already been calculated
+    final TSeriesPartitionSlot seriesPartitionSlot = 
dataPartition.calculateDeviceGroupId(deviceId);
+    if 
(cachedSeriesSlotWithRegions.containsKey(seriesPartitionSlot.getSlotId())) {
+      return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+    }
+
+    if (!seriesSlotMap.containsKey(seriesPartitionSlot)) {
+      cachedSeriesSlotWithRegions.put(
+          seriesPartitionSlot.getSlotId(), 
Collections.singletonList(NOT_ASSIGNED));
+      return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+    }
+
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> timeSlotMap =
+        seriesSlotMap.get(seriesPartitionSlot);
+    if (timeSlotMap.size() == 1) {
+      TTimePartitionSlot timePartitionSlot = 
timeSlotMap.keySet().iterator().next();
+      if (timeFilter == null
+          || TimePartitionUtils.satisfyPartitionStartTime(
+              timeFilter, timePartitionSlot.startTime)) {
+        cachedSeriesSlotWithRegions.put(
+            seriesPartitionSlot.getSlotId(), 
timeSlotMap.values().iterator().next());
+        return timeSlotMap.values().iterator().next();
+      } else {
+        cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(), 
Collections.emptyList());
+        return Collections.emptyList();
+      }
+    }
+
+    Set<TRegionReplicaSet> resultSet = new HashSet<>();
+    for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry : 
timeSlotMap.entrySet()) {
+      TTimePartitionSlot timePartitionSlot = entry.getKey();
+      if (TimePartitionUtils.satisfyPartitionStartTime(timeFilter, 
timePartitionSlot.startTime)) {
+        resultSet.addAll(entry.getValue());
+      }
+    }
+    List<TRegionReplicaSet> resultList = new ArrayList<>(resultSet);
+    cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(), 
resultList);
+    return resultList;
+    //    return seriesSlotMap.get(seriesPartitionSlot).entrySet().stream()
+    //        .filter(
+    //            entry ->
+    //                TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
+    // entry.getKey().startTime))
+    //        .flatMap(entry -> entry.getValue().stream())
+    //        .distinct()
+    //        .collect(toList());
+  }
+
   @Override
   public List<PlanNode> visitEnforceSingleRow(EnforceSingleRowNode node, 
PlanContext context) {
     return dealWithPlainSingleChildNode(node, context);

Reply via email to