This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch builtin-udtf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e4bc4ee6f3fbac099bf95d2e2dd831b0ae575247 Author: Chen YZ <[email protected]> AuthorDate: Fri Feb 28 16:08:00 2025 +0800 save --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 3 +- .../distribute/TableDistributedPlanGenerator.java | 55 +++++++++++++++++++++- .../TransformAggregationToStreamable.java | 28 +++++------ 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 8fa597a9c6f..41706fbd323 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -370,7 +370,8 @@ public class IoTDBConfig { /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */ private int queryThreadCount = Runtime.getRuntime().availableProcessors(); - private int degreeOfParallelism = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + // private int degreeOfParallelism = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + private int degreeOfParallelism = 1; private int mergeThresholdOfExplainAnalyze = 10; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 8ddc35912bd..d8037df5f12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -516,12 +516,65 @@ public class TableDistributedPlanGenerator public List<PlanNode> visitDeviceTableScan( final DeviceTableScanNode node, final PlanContext context) { if (context.isPushDownGrouping()) { - return constructDeviceTableScanByTags(node, context); + // return constructDeviceTableScanByTags(node, context); + return constructDeviceTableScanTmp(node, context); } else { return constructDeviceTableScanByRegionReplicaSet(node, context); } } + private List<PlanNode> constructDeviceTableScanTmp( + final DeviceTableScanNode node, final PlanContext context) { + List<PlanNode> result = new ArrayList<>(); + final Map<TRegionReplicaSet, Integer> regionDeviceCount = new HashMap<>(); + for (final DeviceEntry deviceEntry : node.getDeviceEntries()) { + final List<TRegionReplicaSet> regionReplicaSets = + analysis.getDataRegionReplicaSetWithTimeFilter( + node.getQualifiedObjectName().getDatabaseName(), + deviceEntry.getDeviceID(), + node.getTimeFilter()); + List<PlanNode> tmp = new ArrayList<>(); + for (final TRegionReplicaSet regionReplicaSet : regionReplicaSets) { + regionDeviceCount.put( + regionReplicaSet, regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1); + DeviceTableScanNode scanNode = + new DeviceTableScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + new ArrayList<>(), + node.getIdAndAttributeIndexMap(), + node.getScanOrder(), + node.getTimePredicate().orElse(null), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.containsNonAlignedDevice()); + scanNode.setRegionReplicaSet(regionReplicaSet); + scanNode.appendDeviceEntry(deviceEntry); + tmp.add(scanNode); + } + if (context.hasSortProperty) { + processSortProperty(node, tmp, context); + } + if (tmp.size() == 1) { + result.add(tmp.get(0)); + } else { + CollectNode collectNode = + new CollectNode(queryId.genPlanNodeId(), tmp, node.getOutputSymbols()); + result.add(collectNode); + } + } + context.mostUsedRegion = + regionDeviceCount.entrySet().stream() + .max(Comparator.comparingInt(Map.Entry::getValue)) + .map(Map.Entry::getKey) + .orElse(null); + return result; + } + private List<PlanNode> constructDeviceTableScanByTags( final DeviceTableScanNode node, final PlanContext context) { List<PlanNode> result = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java index 6a7609e907f..60944e152d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; @@ -35,6 +36,7 @@ import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -125,20 +127,18 @@ public class TransformAggregationToStreamable implements PlanOptimizer { @Override public List<Symbol> visitTableFunctionProcessor( TableFunctionProcessorNode node, GroupContext context) { - - return ImmutableList.of(); - // if (node.getChildren().isEmpty()) { - // return ImmutableList.of(); - // } else if (node.isRowSemantic()) { - // return visitPlan(node, context); - // } - // Optional<DataOrganizationSpecification> dataOrganizationSpecification = - // node.getDataOrganizationSpecification(); - // return dataOrganizationSpecification - // .<List<Symbol>>map( - // organizationSpecification -> - // ImmutableList.copyOf(organizationSpecification.getPartitionBy())) - // .orElseGet(ImmutableList::of); + if (node.getChildren().isEmpty()) { + return ImmutableList.of(); + } else if (node.isRowSemantic()) { + return visitPlan(node, context); + } + Optional<DataOrganizationSpecification> dataOrganizationSpecification = + node.getDataOrganizationSpecification(); + return dataOrganizationSpecification + .<List<Symbol>>map( + organizationSpecification -> + ImmutableList.copyOf(organizationSpecification.getPartitionBy())) + .orElseGet(ImmutableList::of); } @Override
