This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98f397c4c49 Optimize the logic of get ReplicaSets for devices
98f397c4c49 is described below
commit 98f397c4c49668d4370baefbcfa336f3180a9741
Author: Beyyes <[email protected]>
AuthorDate: Sat Mar 15 21:36:28 2025 +0800
Optimize the logic of get ReplicaSets for devices
---
.../distribute/TableDistributedPlanGenerator.java | 104 +++++++++++++++++----
1 file changed, 87 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 90391cf8f4e..f18d9b398fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -22,8 +22,12 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
@@ -81,6 +85,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
import javax.annotation.Nonnull;
@@ -534,12 +539,7 @@ public class TableDistributedPlanGenerator
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode>
pair =
- tableScanNodeMap.get(regionReplicaSet);
-
- if (pair == null) {
- pair = new Pair<>(null, null);
- tableScanNodeMap.put(regionReplicaSet, pair);
- }
+ tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new
Pair<>(null, null));
if (pair.left == null && aligned) {
TreeAlignedDeviceViewScanNode scanNode =
@@ -719,20 +719,35 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitAggregationTableScan(
AggregationTableScanNode node, PlanContext context) {
+ String dbName =
+ node instanceof AggregationTreeDeviceViewScanNode
+ ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
+ : node.getQualifiedObjectName().getDatabaseName();
+ DataPartition dataPartition = analysis.getDataPartition();
boolean needSplit = false;
List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
- for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
- List<TRegionReplicaSet> regionReplicaSets =
- analysis.getDataRegionReplicaSetWithTimeFilter(
- node instanceof AggregationTreeDeviceViewScanNode
- ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
- : node.getQualifiedObjectName().getDatabaseName(),
- deviceEntry.getDeviceID(),
- node.getTimeFilter());
- if (regionReplicaSets.size() > 1) {
- needSplit = true;
+ if (dataPartition == null) {
+ // do nothing
+ } else if (!dataPartition.getDataPartitionMap().containsKey(dbName)) {
+ throw new SemanticException(
+ String.format("Given queried database: %s is not exist!", dbName));
+ } else {
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
+ dataPartition.getDataPartitionMap().get(dbName);
+ Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
+ for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
+ List<TRegionReplicaSet> regionReplicaSets =
+ getReplicaSetWithTimeFilter(
+ dataPartition,
+ seriesSlotMap,
+ deviceEntry.getDeviceID(),
+ node.getTimeFilter(),
+ cachedSeriesSlotWithRegions);
+ if (regionReplicaSets.size() > 1) {
+ needSplit = true;
+ }
+ regionReplicaSetsList.add(regionReplicaSets);
}
- regionReplicaSetsList.add(regionReplicaSets);
}
if (regionReplicaSetsList.isEmpty()) {
@@ -799,6 +814,61 @@ public class TableDistributedPlanGenerator
return resultTableScanNodeList;
}
+ private List<TRegionReplicaSet> getReplicaSetWithTimeFilter(
+ DataPartition dataPartition,
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap,
+ IDeviceID deviceId,
+ Filter timeFilter,
+ Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions) {
+
+ // given seriesPartitionSlot has already been calculated
+ final TSeriesPartitionSlot seriesPartitionSlot =
dataPartition.calculateDeviceGroupId(deviceId);
+ if
(cachedSeriesSlotWithRegions.containsKey(seriesPartitionSlot.getSlotId())) {
+ return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+ }
+
+ if (!seriesSlotMap.containsKey(seriesPartitionSlot)) {
+ cachedSeriesSlotWithRegions.put(
+ seriesPartitionSlot.getSlotId(),
Collections.singletonList(NOT_ASSIGNED));
+ return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+ }
+
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> timeSlotMap =
+ seriesSlotMap.get(seriesPartitionSlot);
+ if (timeSlotMap.size() == 1) {
+ TTimePartitionSlot timePartitionSlot =
timeSlotMap.keySet().iterator().next();
+ if (timeFilter == null
+ || TimePartitionUtils.satisfyPartitionStartTime(
+ timeFilter, timePartitionSlot.startTime)) {
+ cachedSeriesSlotWithRegions.put(
+ seriesPartitionSlot.getSlotId(),
timeSlotMap.values().iterator().next());
+ return timeSlotMap.values().iterator().next();
+ } else {
+ cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(),
Collections.emptyList());
+ return Collections.emptyList();
+ }
+ }
+
+ Set<TRegionReplicaSet> resultSet = new HashSet<>();
+ for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry :
timeSlotMap.entrySet()) {
+ TTimePartitionSlot timePartitionSlot = entry.getKey();
+ if (TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
timePartitionSlot.startTime)) {
+ resultSet.addAll(entry.getValue());
+ }
+ }
+ List<TRegionReplicaSet> resultList = new ArrayList<>(resultSet);
+ cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(),
resultList);
+ return resultList;
+ // return seriesSlotMap.get(seriesPartitionSlot).entrySet().stream()
+ // .filter(
+ // entry ->
+ // TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
+ // entry.getKey().startTime))
+ // .flatMap(entry -> entry.getValue().stream())
+ // .distinct()
+ // .collect(toList());
+ }
+
@Override
public List<PlanNode> visitEnforceSingleRow(EnforceSingleRowNode node,
PlanContext context) {
return dealWithPlainSingleChildNode(node, context);