This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch memoryDistribution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f7f2ac2affa58e9f9332798d7db9a9867265f249 Author: Alima777 <[email protected]> AuthorDate: Thu Dec 22 16:39:00 2022 +0800 Fix memory distribution bug --- .../plan/planner/MemoryDistributionCalculator.java | 48 +++++++--------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java index fb117804a7..5120e73647 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java @@ -80,18 +80,10 @@ import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode public class MemoryDistributionCalculator extends PlanVisitor<Void, MemoryDistributionCalculator.MemoryDistributionContext> { /** This map is used to calculate the total split of memory */ - private final Map<PlanNodeId, List<PlanNodeId>> exchangeMap; - - public MemoryDistributionCalculator() { - this.exchangeMap = new HashMap<>(); - } + private int exchangeNum; public long calculateTotalSplit() { - long res = 0; - for (List<PlanNodeId> l : exchangeMap.values()) { - res += l.size(); - } - return res; + return exchangeNum; } @Override @@ -127,29 +119,17 @@ public class MemoryDistributionCalculator }); } + /** + * We do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update + */ @Override public Void visitExchange(ExchangeNode node, MemoryDistributionContext context) { - // we do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update - // the map - if (context == null) { - // context == null means this ExchangeNode has no father - exchangeMap - .computeIfAbsent(node.getPlanNodeId(), x -> new ArrayList<>()) - .add(node.getPlanNodeId()); - } else { - if (context.memoryDistributionType.equals( - MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) { - exchangeMap - .computeIfAbsent(context.planNodeId, x -> new ArrayList<>()) - .add(node.getPlanNodeId()); - } else if (context.memoryDistributionType.equals( - MemoryDistributionType.CONSUME_CHILDREN_ONE_BY_ONE) - && !exchangeMap.containsKey(context.planNodeId)) { - // All children share one split, thus only one node needs to be put into the map - exchangeMap - .computeIfAbsent(context.planNodeId, x -> new ArrayList<>()) - .add(node.getPlanNodeId()); - } + // context == null means this ExchangeNode doesn't have a father + if (context == null || context.memoryDistributionType.equals(MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) { + exchangeNum++; + } else if (!context.exchangeAdded){ + context.exchangeAdded = true; + exchangeNum++; } return null; } @@ -159,10 +139,9 @@ public class MemoryDistributionCalculator // LocalSinkHandle and LocalSourceHandle are one-to-one mapped and only LocalSourceHandle do the // update if (!isSameNode(node.getDownStreamEndpoint())) { - exchangeMap - .computeIfAbsent(node.getDownStreamPlanNodeId(), x -> new ArrayList<>()) - .add(node.getDownStreamPlanNodeId()); + exchangeNum++; } + node.getChild().accept(this, context); return null; } @@ -479,6 +458,7 @@ public class MemoryDistributionCalculator static class MemoryDistributionContext { final PlanNodeId planNodeId; + boolean exchangeAdded = false; final MemoryDistributionType memoryDistributionType; MemoryDistributionContext(
