This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d63c07a6ea87fd9e072c805f5b7b9e72ee543744 Author: Beyyes <[email protected]> AuthorDate: Sat Mar 15 15:05:02 2025 +0800 optimize the logic of get ReplicaSets of devices --- .../distribute/TableDistributedPlanGenerator.java | 92 +++++++++++++++++++--- 1 file changed, 80 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 90391cf8f4e..d117279451f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -22,8 +22,12 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator; @@ -81,6 +85,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Pair; import javax.annotation.Nonnull; @@ -534,12 +539,7 @@ public class TableDistributedPlanGenerator for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) { boolean aligned = deviceEntry instanceof AlignedDeviceEntry; Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode> pair = - tableScanNodeMap.get(regionReplicaSet); - - if (pair == null) { - pair = new Pair<>(null, null); - tableScanNodeMap.put(regionReplicaSet, pair); - } + tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new Pair<>(null, null)); if (pair.left == null && aligned) { TreeAlignedDeviceViewScanNode scanNode = @@ -719,16 +719,29 @@ public class TableDistributedPlanGenerator @Override public List<PlanNode> visitAggregationTableScan( AggregationTableScanNode node, PlanContext context) { - boolean needSplit = false; + String dbName = + node instanceof AggregationTreeDeviceViewScanNode + ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName() + : node.getQualifiedObjectName().getDatabaseName(); + DataPartition dataPartition = analysis.getDataPartition(); + if (dataPartition == null || !dataPartition.getDataPartitionMap().containsKey(dbName)) { + throw new SemanticException( + String.format("Given queried database: %s is not exist!", dbName)); + } + + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesSlotMap = + dataPartition.getDataPartitionMap().get(dbName); + Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new HashMap<>(); List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>(); + boolean needSplit = false; for (DeviceEntry deviceEntry : node.getDeviceEntries()) { List<TRegionReplicaSet> regionReplicaSets = - analysis.getDataRegionReplicaSetWithTimeFilter( - node instanceof AggregationTreeDeviceViewScanNode - ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName() - : node.getQualifiedObjectName().getDatabaseName(), + getReplicaSetWithTimeFilter( + dataPartition, + seriesSlotMap, deviceEntry.getDeviceID(), - node.getTimeFilter()); + node.getTimeFilter(), + cachedSeriesSlotWithRegions); if (regionReplicaSets.size() > 1) { needSplit = true; } @@ -799,6 +812,61 @@ public class TableDistributedPlanGenerator return resultTableScanNodeList; } + private List<TRegionReplicaSet> getReplicaSetWithTimeFilter( + DataPartition dataPartition, + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesSlotMap, + IDeviceID deviceId, + Filter timeFilter, + Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions) { + + // given seriesPartitionSlot has already been calculated + final TSeriesPartitionSlot seriesPartitionSlot = dataPartition.calculateDeviceGroupId(deviceId); + if (cachedSeriesSlotWithRegions.containsKey(seriesPartitionSlot.getSlotId())) { + return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId()); + } + + if (!seriesSlotMap.containsKey(seriesPartitionSlot)) { + cachedSeriesSlotWithRegions.put( + seriesPartitionSlot.getSlotId(), Collections.singletonList(NOT_ASSIGNED)); + return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId()); + } + + Map<TTimePartitionSlot, List<TRegionReplicaSet>> timeSlotMap = + seriesSlotMap.get(seriesPartitionSlot); + if (timeSlotMap.size() == 1) { + TTimePartitionSlot timePartitionSlot = timeSlotMap.keySet().iterator().next(); + if (timeFilter == null + || TimePartitionUtils.satisfyPartitionStartTime( + timeFilter, timePartitionSlot.startTime)) { + cachedSeriesSlotWithRegions.put( + seriesPartitionSlot.getSlotId(), timeSlotMap.values().iterator().next()); + return timeSlotMap.values().iterator().next(); + } else { + cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(), Collections.emptyList()); + return Collections.emptyList(); + } + } + + Set<TRegionReplicaSet> resultSet = new HashSet<>(); + for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry : timeSlotMap.entrySet()) { + TTimePartitionSlot timePartitionSlot = entry.getKey(); + if (TimePartitionUtils.satisfyPartitionStartTime(timeFilter, timePartitionSlot.startTime)) { + resultSet.addAll(entry.getValue()); + } + } + List<TRegionReplicaSet> resultList = new ArrayList<>(resultSet); + cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(), resultList); + return resultList; + // return seriesSlotMap.get(seriesPartitionSlot).entrySet().stream() + // .filter( + // entry -> + // TimePartitionUtils.satisfyPartitionStartTime(timeFilter, + // entry.getKey().startTime)) + // .flatMap(entry -> entry.getValue().stream()) + // .distinct() + // .collect(toList()); + } + @Override public List<PlanNode> visitEnforceSingleRow(EnforceSingleRowNode node, PlanContext context) { return dealWithPlainSingleChildNode(node, context);
