This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 81a41fca6 IMPALA-12183: Fix cardinality clamping across aggregation 
phases
81a41fca6 is described below

commit 81a41fca6dee014f11695f553ecdb6c82a172d0a
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Jun 2 08:14:15 2023 -0700

    IMPALA-12183: Fix cardinality clamping across aggregation phases
    
    In the Impala planner, an aggregation node's cardinality is a sum of all
    its aggregation class cardinality. An aggregation class cardinality is a
    simple multiplication of NDVs of contributing grouping columns. Since
    this simple multiplication of NDVs can be greater than the aggregation
    node's input cardinality, each aggregation class cardinality is further
    clamped at the aggregation node's input cardinality.
    
    An aggregation operator can translate into a chain of multi-phase
    aggregation plan nodes. The longest possible aggregation phase is as
    follows, from the bottom to the top:
    
    1. FIRST
    2. FIRST_MERGE
    3. SECOND
    4. SECOND_MERGE
    5. TRANSPOSE
    
    FIRST_MERGE aggregation maintains its aggregation class cardinality
    clamping at its corresponding FIRST aggregation's input
    cardinality (similar relationship between SECOND_MERGE and SECOND).
    However, the SECOND aggregation was clamped at the FIRST_MERGE output
    cardinality instead of the FIRST input cardinality. This cardinality
    mispropagation can causes cardinality explosion in the later aggregation
    phase and node operator above them.
    
    This patch fix the clamping of multi-phase aggregation to always look at
    input cardinality of FIRST aggregation node. An exception is made for
    TRANSPOSE phase of grouping set aggregation (such as ROLLUP). In that
    case, cardinality clamping will use output cardinality of child node
    right below it (either FIRST_MERGE or SECOND_MERGE) because the output
    cardinality of the whole aggregation chain can be higher than input
    cardinality of the FIRST phase.
    
    Testing:
    - Add test in card-agg.test
    - Pass core tests.
    
    Change-Id: I1d414fe56b027f887c7f901d8a6799a388b16b95
    Reviewed-on: http://gerrit.cloudera.org:8080/20009
    Reviewed-by: Aman Sinha <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/planner/AggregationNode.java | 119 ++++++++++++++++-----
 .../queries/PlannerTest/card-agg.test              | 100 +++++++++++++++++
 .../PlannerTest/partition-key-scans-default.test   |   2 +-
 .../queries/PlannerTest/partition-key-scans.test   |   2 +-
 .../queries/PlannerTest/tpcds-processing-cost.test |  16 +--
 .../queries/PlannerTest/tpcds/tpcds-q14a.test      |  16 +--
 .../queries/PlannerTest/tpcds/tpcds-q18.test       |  16 +--
 .../queries/PlannerTest/tpcds/tpcds-q22.test       |  16 +--
 .../queries/PlannerTest/tpcds/tpcds-q27.test       |   6 +-
 .../queries/PlannerTest/tpcds/tpcds-q67.test       |  16 +--
 .../queries/PlannerTest/tpcds/tpcds-q98.test       |   2 +-
 11 files changed, 237 insertions(+), 74 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 2b095f02c..641d63e9c 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.CaseExpr;
@@ -91,6 +93,18 @@ public class AggregationNode extends PlanNode {
   // The output is from a non-correlated scalar subquery returning at most one 
value.
   protected boolean isNonCorrelatedScalarSubquery_ = false;
 
+  // The aggregate input cardinality of this node.
+  // Initialized in computeStats(). Depending on the aggPhase_ of this node 
and value of
+  // multiAggInfo_.getIsGroupingSet(), it can be initialized with either the 
input
+  // cardinality of the related FIRST phase aggregation or this node's 
immediate input
+  // cardinality (this child's cardinality).
+  private long aggInputCardinality_ = -1;
+
+  // Hold the estimated number of groups that will be produced by each 
aggregation class.
+  // Initialized in computeStats(). May contain -1 if an aggregation class num 
group
+  // can not be estimated.
+  private List<Long> aggClassNumGroups_;
+
   public AggregationNode(
       PlanNodeId id, PlanNode input, MultiAggregateInfo multiAggInfo, AggPhase 
aggPhase) {
     super(id, "AGGREGATE");
@@ -224,17 +238,25 @@ public class AggregationNode extends PlanNode {
     // TODO: IMPALA-2945: this doesn't correctly take into account duplicate 
keys on
     // multiple nodes in a pre-aggregation.
     cardinality_ = 0;
+    aggInputCardinality_ = getAggInputCardinality();
+    Preconditions.checkState(aggInputCardinality_ >= -1, aggInputCardinality_);
+    boolean unknownEstimate = false;
+    aggClassNumGroups_ = Lists.newArrayList();
     for (AggregateInfo aggInfo : aggInfos_) {
       // Compute the cardinality for this set of grouping exprs.
       long numGroups = estimateNumGroups(aggInfo);
       Preconditions.checkState(numGroups >= -1, numGroups);
+      aggClassNumGroups_.add(numGroups);
       if (numGroups == -1) {
         // No estimate of the number of groups is possible, can't even make a
         // conservative estimate.
-        cardinality_ = -1;
-        break;
+        unknownEstimate = true;
+      } else {
+        cardinality_ = checkedAdd(cardinality_, numGroups);
       }
-      cardinality_ = checkedAdd(cardinality_, numGroups);
+    }
+    if (unknownEstimate) {
+      cardinality_ = -1;
     }
 
     // Take conjuncts into account.
@@ -257,11 +279,10 @@ public class AggregationNode extends PlanNode {
     // limit the potential overestimation. We could, in future, improve this 
further
     // by recognizing functional dependencies.
     List<Expr> groupingExprs = aggInfo.getGroupingExprs();
-    long aggInputCardinality = getAggInputCardinality();
-    long numGroups = estimateNumGroups(groupingExprs, aggInputCardinality);
+    long numGroups = estimateNumGroups(groupingExprs, aggInputCardinality_);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Node " + id_ + " numGroups= " + numGroups + " 
aggInputCardinality=" +
-          aggInputCardinality + " for agg class " + aggInfo.debugString());
+      LOG.trace("Node " + id_ + " numGroups= " + numGroups + " 
aggInputCardinality="
+          + aggInputCardinality_ + " for agg class " + aggInfo.debugString());
     }
     return numGroups;
   }
@@ -293,22 +314,44 @@ public class AggregationNode extends PlanNode {
   }
 
   /**
-   * Compute the input cardinality to the distributed aggregation. If this is a
-   * merge aggregation, we need to find the cardinality of the input to the
-   * preaggregation.
-   * Return -1 if unknown.
+   * Compute the input cardinality to the distributed aggregation. This 
searches down
+   * for the FIRST phase aggregation node, and returns the minimum between 
input
+   * cardinality of that FIRST phase aggregation vs this node's input 
cardinality.
+   * However, if this is a TRANSPOSE phase of grouping set aggregation (such 
as ROLLUP),
+   * it immediately returns this node's input cardinality since output 
cardinality of
+   * the whole aggregation chain can be higher than input cardinality of the 
FIRST phase
+   * aggregation. Return -1 if unknown.
    */
   private long getAggInputCardinality() {
-    PlanNode child = getChild(0);
-    if (!aggPhase_.isMerge()) return child.getCardinality();
-    PlanNode preAgg;
+    long inputCardinality = getChild(0).getCardinality();
+    if (multiAggInfo_.getIsGroupingSet() && aggPhase_ == AggPhase.TRANSPOSE) {
+      return inputCardinality;
+    }
+    AggregationNode firstAgg = this;
+    while (firstAgg.getAggPhase() != AggPhase.FIRST) {
+      firstAgg = getPrevAggNode(firstAgg);
+    }
+    return Math.min(inputCardinality, firstAgg.getChild(0).getCardinality());
+  }
+
+  private AggregationNode getPrevAggNode(AggregationNode aggNode) {
+    Preconditions.checkArgument(aggNode.getAggPhase() != AggPhase.FIRST);
+    PlanNode child = aggNode.getChild(0);
     if (child instanceof ExchangeNode) {
-      preAgg = child.getChild(0);
-    } else {
-      preAgg = child;
+      child = child.getChild(0);
     }
-    Preconditions.checkState(preAgg instanceof AggregationNode);
-    return preAgg.getChild(0).getCardinality();
+    Preconditions.checkState(child instanceof AggregationNode);
+    return (AggregationNode) child;
+  }
+
+  private @Nullable AggregationNode getPrevAggInputNode() {
+    AggregationNode prevAgg = null;
+    if (aggPhase_ != AggPhase.FIRST && aggPhase_ != AggPhase.TRANSPOSE) {
+      prevAgg = getPrevAggNode(this);
+      Preconditions.checkState(aggInfos_.size() == prevAgg.aggInfos_.size());
+      Preconditions.checkState(aggInfos_.size() == 
prevAgg.aggClassNumGroups_.size());
+    }
+    return prevAgg;
   }
 
   /**
@@ -506,12 +549,30 @@ public class AggregationNode extends PlanNode {
     return output;
   }
 
+  private long getAggClassNumGroup(
+      @Nullable AggregationNode prevAgg, AggregateInfo aggInfo) {
+    if (prevAgg == null) {
+      return aggInputCardinality_;
+    } else {
+      // This aggInfo should be in-line with aggInfo of prevAgg
+      // (sizes are validated in getPrevAggInputNode).
+      int aggIdx = aggInfos_.indexOf(aggInfo);
+      long aggClassNumGroup = prevAgg.aggClassNumGroups_.get(aggIdx);
+      if (aggClassNumGroup <= -1) {
+        return aggInputCardinality_;
+      } else {
+        return Math.min(aggInputCardinality_, aggClassNumGroup);
+      }
+    }
+  }
+
   @Override
   public void computeProcessingCost(TQueryOptions queryOptions) {
     processingCost_ = ProcessingCost.zero();
+    AggregationNode prevAgg = getPrevAggInputNode();
     for (AggregateInfo aggInfo : aggInfos_) {
-      ProcessingCost aggCost =
-          aggInfo.computeProcessingCost(getDisplayLabel(), 
getChild(0).getCardinality());
+      ProcessingCost aggCost = aggInfo.computeProcessingCost(
+          getDisplayLabel(), getAggClassNumGroup(prevAgg, aggInfo));
       processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost);
     }
   }
@@ -520,8 +581,10 @@ public class AggregationNode extends PlanNode {
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size());
     resourceProfiles_.clear();
+    AggregationNode prevAgg = getPrevAggInputNode();
     for (AggregateInfo aggInfo : aggInfos_) {
-      resourceProfiles_.add(computeAggClassResourceProfile(queryOptions, 
aggInfo));
+      resourceProfiles_.add(computeAggClassResourceProfile(
+          queryOptions, aggInfo, getAggClassNumGroup(prevAgg, aggInfo)));
     }
     if (aggInfos_.size() == 1) {
       nodeResourceProfile_ = resourceProfiles_.get(0);
@@ -534,7 +597,7 @@ public class AggregationNode extends PlanNode {
   }
 
   private ResourceProfile computeAggClassResourceProfile(
-      TQueryOptions queryOptions, AggregateInfo aggInfo) {
+      TQueryOptions queryOptions, AggregateInfo aggInfo, long 
inputCardinality) {
     Preconditions.checkNotNull(
         fragment_, "PlanNode must be placed into a fragment before calling 
this method.");
     long perInstanceCardinality = 
fragment_.getPerInstanceNdv(aggInfo.getGroupingExprs());
@@ -543,8 +606,8 @@ public class AggregationNode extends PlanNode {
     if (perInstanceCardinality == -1) {
       perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
     } else {
-      // Per-instance cardinality cannot be greater than the total input 
cardinality.
-      long inputCardinality = getChild(0).getCardinality();
+      // Per-instance cardinality cannot be greater than the total input 
cardinality
+      // going into this aggregation class.
       if (inputCardinality != -1) {
         // Calculate the input cardinality distributed across fragment 
instances.
         long numInstances = fragment_.getNumInstances();
@@ -555,12 +618,12 @@ public class AggregationNode extends PlanNode {
             // multiple fragment instances.
             // This number was derived using empirical analysis of real-world
             // and benchmark (tpch, tpcds) queries.
-            perInstanceInputCardinality =
-              (long) Math.ceil((inputCardinality / numInstances) * 
DEFAULT_SKEW_FACTOR);
+            perInstanceInputCardinality = (long) Math.ceil(
+                ((double) inputCardinality / numInstances) * 
DEFAULT_SKEW_FACTOR);
           } else {
             // The data is distributed through hash, it will be more balanced.
             perInstanceInputCardinality =
-              (long) Math.ceil(inputCardinality / numInstances);
+                (long) Math.ceil((double) inputCardinality / numInstances);
           }
         } else {
           // When numInstances is 1 or unknown(-1), 
perInstanceInputCardinality is the
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test
index 7576e037a..6d4f58a78 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/card-agg.test
@@ -311,6 +311,106 @@ PLAN-ROOT SINK
    HDFS partitions=1/1 files=1 size=718.94MB
    row-size=62B cardinality=6.00M
 ====
+# Similar like test above, but with per-class cardinality estimate higher than
+# than aggregation input cardinality.
+select l_partkey, l_suppkey, count(distinct l_quantity), count(distinct 
l_shipmode), sum(1)
+from tpch_parquet.lineitem
+group by l_partkey, l_suppkey
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid(2,4,5) = 2, count(l_quantity)), 
aggif(valid_tid(2,4,5) = 4, count(l_shipmode)), aggif(valid_tid(2,4,5) = 5, 
sum(1))
+|  group by: CASE valid_tid(2,4,5) WHEN 2 THEN l_partkey WHEN 4 THEN l_partkey 
WHEN 5 THEN l_partkey END, CASE valid_tid(2,4,5) WHEN 2 THEN l_suppkey WHEN 4 
THEN l_suppkey WHEN 5 THEN l_suppkey END
+|  row-size=40B cardinality=6.00M
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(l_quantity)
+|    group by: l_partkey, l_suppkey
+|  Class 1
+|    output: count(l_shipmode)
+|    group by: l_partkey, l_suppkey
+|  Class 2
+|    output: sum:merge(1)
+|    group by: l_partkey, l_suppkey
+|  row-size=72B cardinality=18.00M
+|
+01:AGGREGATE
+|  Class 0
+|    group by: l_partkey, l_suppkey, l_quantity
+|  Class 1
+|    group by: l_partkey, l_suppkey, l_shipmode
+|  Class 2
+|    output: sum(1)
+|    group by: l_partkey, l_suppkey
+|  row-size=80B cardinality=18.00M
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   row-size=40B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid(2,4,5) = 2, count(l_quantity)), 
aggif(valid_tid(2,4,5) = 4, count(l_shipmode)), aggif(valid_tid(2,4,5) = 5, 
sum(1))
+|  group by: CASE valid_tid(2,4,5) WHEN 2 THEN l_partkey WHEN 4 THEN l_partkey 
WHEN 5 THEN l_partkey END, CASE valid_tid(2,4,5) WHEN 2 THEN l_suppkey WHEN 4 
THEN l_suppkey WHEN 5 THEN l_suppkey END
+|  row-size=40B cardinality=6.00M
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(l_quantity)
+|    group by: l_partkey, l_suppkey
+|  Class 1
+|    output: count:merge(l_shipmode)
+|    group by: l_partkey, l_suppkey
+|  Class 2
+|    output: sum:merge(1)
+|    group by: l_partkey, l_suppkey
+|  row-size=72B cardinality=18.00M
+|
+06:EXCHANGE [HASH(CASE valid_tid(2,4,5) WHEN 2 THEN murmur_hash(l_partkey) 
WHEN 4 THEN murmur_hash(l_partkey) WHEN 5 THEN murmur_hash(l_partkey) END,CASE 
valid_tid(2,4,5) WHEN 2 THEN murmur_hash(l_suppkey) WHEN 4 THEN 
murmur_hash(l_suppkey) WHEN 5 THEN murmur_hash(l_suppkey) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(l_quantity)
+|    group by: l_partkey, l_suppkey
+|  Class 1
+|    output: count(l_shipmode)
+|    group by: l_partkey, l_suppkey
+|  Class 2
+|    output: sum:merge(1)
+|    group by: l_partkey, l_suppkey
+|  row-size=72B cardinality=18.00M
+|
+05:AGGREGATE
+|  Class 0
+|    group by: l_partkey, l_suppkey, l_quantity
+|  Class 1
+|    group by: l_partkey, l_suppkey, l_shipmode
+|  Class 2
+|    output: sum:merge(1)
+|    group by: l_partkey, l_suppkey
+|  row-size=80B cardinality=18.00M
+|
+04:EXCHANGE [HASH(CASE valid_tid(1,3,5) WHEN 1 THEN murmur_hash(l_partkey) 
WHEN 3 THEN murmur_hash(l_partkey) WHEN 5 THEN murmur_hash(l_partkey) END,CASE 
valid_tid(1,3,5) WHEN 1 THEN murmur_hash(l_suppkey) WHEN 3 THEN 
murmur_hash(l_suppkey) WHEN 5 THEN murmur_hash(l_suppkey) END,CASE 
valid_tid(1,3,5) WHEN 1 THEN murmur_hash(l_quantity) WHEN 3 THEN 
murmur_hash(l_shipmode) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: l_partkey, l_suppkey, l_quantity
+|  Class 1
+|    group by: l_partkey, l_suppkey, l_shipmode
+|  Class 2
+|    output: sum(1)
+|    group by: l_partkey, l_suppkey
+|  row-size=80B cardinality=18.00M
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   row-size=40B cardinality=6.00M
+====
 # Mixed grouping and non-grouping aggregations.
 select count(distinct id), count(distinct int_col), count(*)
 from functional.alltypes
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans-default.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans-default.test
index c210ae3a6..e3c1409d5 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans-default.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans-default.test
@@ -432,7 +432,7 @@ PLAN-ROOT SINK
 03:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(2,4) = 2, count('test')), aggif(valid_tid(2,4) = 4, 
count(1234))
 |  group by: CASE valid_tid(2,4) WHEN 2 THEN 1234 WHEN 4 THEN 1234 END
-|  row-size=18B cardinality=2
+|  row-size=18B cardinality=1
 |
 02:AGGREGATE [FINALIZE]
 |  Class 0
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
index 232d07440..1130922eb 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/partition-key-scans.test
@@ -374,7 +374,7 @@ PLAN-ROOT SINK
 03:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(2,4) = 2, count('test')), aggif(valid_tid(2,4) = 4, 
count(1234))
 |  group by: CASE valid_tid(2,4) WHEN 2 THEN 1234 WHEN 4 THEN 1234 END
-|  row-size=18B cardinality=2
+|  row-size=18B cardinality=1
 |
 02:AGGREGATE [FINALIZE]
 |  Class 0
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
index 94307dbfe..390e58354 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
@@ -270,8 +270,8 @@ with  cross_items as
  order by channel,i_brand_id,i_class_id,i_category_id
 LIMIT 100
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=790.00MB Threads=163
-Per-Host Resource Estimates: Memory=2.42GB
+Max Per-Host Resource Reservation: Memory=716.38MB Threads=163
+Per-Host Resource Estimates: Memory=2.36GB
 F80:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=4.03MB mem-reservation=4.00MB 
thread-reservation=1
 |  max-parallelism=1 segment-costs=[606] cpu-comparison-result=157 [max(1 
(self) vs 157 (sum children))]
@@ -287,8 +287,8 @@ PLAN-ROOT SINK
 |  in pipelines: 129(GETNEXT)
 |
 F79:PLAN FRAGMENT [HASH(CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(channel) WHEN 105 THEN murmur_hash(channel) WHEN 106 THEN 
murmur_hash(channel) WHEN 107 THEN murmur_hash(channel) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(i_brand_id) WHEN 105 THEN murmur_hash(i_brand_id) WHEN 106 THEN 
murmur_hash(i_brand_id) WHEN 107 THEN murmur_hash(NULL) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WH [...]
-Per-Instance Resources: mem-estimate=111.05MB mem-reservation=85.12MB 
thread-reservation=1
-max-parallelism=6 segment-costs=[17011772, 3936100, 400, 6] 
cpu-comparison-result=157 [max(6 (self) vs 157 (sum children))]
+Per-Instance Resources: mem-estimate=77.05MB mem-reservation=48.31MB 
thread-reservation=1
+max-parallelism=6 segment-costs=[3516572, 3936100, 400, 6] 
cpu-comparison-result=157 [max(6 (self) vs 157 (sum children))]
 129:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(104,105,106,107,108) WHEN 104 THEN channel WHEN 
105 THEN channel WHEN 106 THEN channel WHEN 107 THEN channel WHEN 108 THEN NULL 
END ASC, CASE valid_tid(104,105,106,107,108) WHEN 104 THEN i_brand_id WHEN 105 
THEN i_brand_id WHEN 106 THEN i_brand_id WHEN 107 THEN NULL WHEN 108 THEN NULL 
END ASC, CASE valid_tid(104,105,106,107,108) WHEN 104 THEN i_class_id WHEN 105 
THEN i_class_id WHEN 106 THEN NULL WHEN 107 THEN NULL WHEN 108 THEN NULL END 
ASC, CASE valid_tid(10 [...]
 |  mem-estimate=4.69KB mem-reservation=0B thread-reservation=0
@@ -318,8 +318,8 @@ max-parallelism=6 segment-costs=[17011772, 3936100, 400, 6] 
cpu-comparison-resul
 |  Class 4
 |    output: sum:merge(sales), sum:merge(number_sales)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=98.00MB mem-reservation=76.62MB thread-reservation=0
-|  tuple-ids=104N,105N,106N,107N,108N row-size=240B cardinality=562.30K 
cost=16869000
+|  mem-estimate=64.00MB mem-reservation=39.81MB thread-reservation=0
+|  tuple-ids=104N,105N,106N,107N,108N row-size=240B cardinality=562.30K 
cost=3373800
 |  in pipelines: 215(GETNEXT), 151(OPEN), 179(OPEN), 207(OPEN)
 |
 214:EXCHANGE [HASH(CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(channel) WHEN 105 THEN murmur_hash(channel) WHEN 106 THEN 
murmur_hash(channel) WHEN 107 THEN murmur_hash(channel) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(i_brand_id) WHEN 105 THEN murmur_hash(i_brand_id) WHEN 106 THEN 
murmur_hash(i_brand_id) WHEN 107 THEN murmur_hash(NULL) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WHEN 10 [...]
@@ -8007,7 +8007,7 @@ PLAN-ROOT SINK
 |
 F22:PLAN FRAGMENT [HASH(CASE valid_tid(26,27,28) WHEN 26 THEN 
murmur_hash(channel) WHEN 27 THEN murmur_hash(channel) WHEN 28 THEN 
murmur_hash(NULL) END,CASE valid_tid(26,27,28) WHEN 26 THEN murmur_hash(id) 
WHEN 27 THEN murmur_hash(NULL) WHEN 28 THEN murmur_hash(NULL) END)] hosts=3 
instances=6 (adjusted from 48)
 Per-Instance Resources: mem-estimate=40.00MB mem-reservation=7.75MB 
thread-reservation=1
-max-parallelism=6 segment-costs=[176035, 69384, 200, 8] 
cpu-comparison-result=55 [max(6 (self) vs 55 (sum children))]
+max-parallelism=6 segment-costs=[60395, 69384, 200, 8] 
cpu-comparison-result=55 [max(6 (self) vs 55 (sum children))]
 39:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(26,27,28) WHEN 26 THEN channel WHEN 27 THEN 
channel WHEN 28 THEN NULL END ASC, CASE valid_tid(26,27,28) WHEN 26 THEN id 
WHEN 27 THEN NULL WHEN 28 THEN NULL END ASC
 |  mem-estimate=7.03KB mem-reservation=0B thread-reservation=0
@@ -8032,7 +8032,7 @@ max-parallelism=6 segment-costs=[176035, 69384, 200, 8] 
cpu-comparison-result=55
 |    output: sum:merge(sales), sum:merge(`returns`), sum:merge(profit)
 |    group by: NULL, NULL
 |  mem-estimate=30.00MB mem-reservation=5.81MB thread-reservation=0
-|  tuple-ids=26N,27N,28N row-size=216B cardinality=11.56K cost=173460
+|  tuple-ids=26N,27N,28N row-size=216B cardinality=11.56K cost=57820
 |  in pipelines: 62(GETNEXT), 46(OPEN), 53(OPEN), 60(OPEN)
 |
 61:EXCHANGE [HASH(CASE valid_tid(26,27,28) WHEN 26 THEN murmur_hash(channel) 
WHEN 27 THEN murmur_hash(channel) WHEN 28 THEN murmur_hash(NULL) END,CASE 
valid_tid(26,27,28) WHEN 26 THEN murmur_hash(id) WHEN 27 THEN murmur_hash(NULL) 
WHEN 28 THEN murmur_hash(NULL) END)]
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q14a.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q14a.test
index 6d4222ac8..a3d26ad83 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q14a.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q14a.test
@@ -1295,8 +1295,8 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=20B cardinality=2.88M
    in pipelines: 01(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=427.00MB Threads=120
-Per-Host Resource Estimates: Memory=3.56GB
+Max Per-Host Resource Reservation: Memory=424.19MB Threads=120
+Per-Host Resource Estimates: Memory=3.54GB
 F80:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -1311,7 +1311,7 @@ PLAN-ROOT SINK
 |  in pipelines: 129(GETNEXT)
 |
 F79:PLAN FRAGMENT [HASH(CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(channel) WHEN 105 THEN murmur_hash(channel) WHEN 106 THEN 
murmur_hash(channel) WHEN 107 THEN murmur_hash(channel) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(i_brand_id) WHEN 105 THEN murmur_hash(i_brand_id) WHEN 106 THEN 
murmur_hash(i_brand_id) WHEN 107 THEN murmur_hash(NULL) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WH [...]
-Per-Host Resources: mem-estimate=137.09MB mem-reservation=93.62MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=115.00MB mem-reservation=90.81MB 
thread-reservation=1
 129:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(104,105,106,107,108) WHEN 104 THEN channel WHEN 
105 THEN channel WHEN 106 THEN channel WHEN 107 THEN channel WHEN 108 THEN NULL 
END ASC, CASE valid_tid(104,105,106,107,108) WHEN 104 THEN i_brand_id WHEN 105 
THEN i_brand_id WHEN 106 THEN i_brand_id WHEN 107 THEN NULL WHEN 108 THEN NULL 
END ASC, CASE valid_tid(104,105,106,107,108) WHEN 104 THEN i_class_id WHEN 105 
THEN i_class_id WHEN 106 THEN NULL WHEN 107 THEN NULL WHEN 108 THEN NULL END 
ASC, CASE valid_tid(10 [...]
 |  mem-estimate=4.69KB mem-reservation=0B thread-reservation=0
@@ -1341,7 +1341,7 @@ Per-Host Resources: mem-estimate=137.09MB 
mem-reservation=93.62MB thread-reserva
 |  Class 4
 |    output: sum:merge(sales), sum:merge(number_sales)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=120.09MB mem-reservation=76.62MB thread-reservation=0
+|  mem-estimate=98.00MB mem-reservation=73.81MB thread-reservation=0
 |  tuple-ids=104N,105N,106N,107N,108N row-size=240B cardinality=562.30K
 |  in pipelines: 215(GETNEXT), 151(OPEN), 179(OPEN), 207(OPEN)
 |
@@ -3098,8 +3098,8 @@ Per-Host Resources: mem-estimate=67.37MB 
mem-reservation=12.81MB thread-reservat
    tuple-ids=0 row-size=20B cardinality=2.88M
    in pipelines: 01(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=662.12MB Threads=141
-Per-Host Resource Estimates: Memory=2.28GB
+Max Per-Host Resource Reservation: Memory=588.50MB Threads=141
+Per-Host Resource Estimates: Memory=2.22GB
 F80:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=4.03MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -3114,7 +3114,7 @@ PLAN-ROOT SINK
 |  in pipelines: 129(GETNEXT)
 |
 F79:PLAN FRAGMENT [HASH(CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(channel) WHEN 105 THEN murmur_hash(channel) WHEN 106 THEN 
murmur_hash(channel) WHEN 107 THEN murmur_hash(channel) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WHEN 104 THEN 
murmur_hash(i_brand_id) WHEN 105 THEN murmur_hash(i_brand_id) WHEN 106 THEN 
murmur_hash(i_brand_id) WHEN 107 THEN murmur_hash(NULL) WHEN 108 THEN 
murmur_hash(NULL) END,CASE valid_tid(104,105,106,107,108) WH [...]
-Per-Instance Resources: mem-estimate=109.52MB mem-reservation=85.12MB 
thread-reservation=1
+Per-Instance Resources: mem-estimate=75.52MB mem-reservation=48.31MB 
thread-reservation=1
 129:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(104,105,106,107,108) WHEN 104 THEN channel WHEN 
105 THEN channel WHEN 106 THEN channel WHEN 107 THEN channel WHEN 108 THEN NULL 
END ASC, CASE valid_tid(104,105,106,107,108) WHEN 104 THEN i_brand_id WHEN 105 
THEN i_brand_id WHEN 106 THEN i_brand_id WHEN 107 THEN NULL WHEN 108 THEN NULL 
END ASC, CASE valid_tid(104,105,106,107,108) WHEN 104 THEN i_class_id WHEN 105 
THEN i_class_id WHEN 106 THEN NULL WHEN 107 THEN NULL WHEN 108 THEN NULL END 
ASC, CASE valid_tid(10 [...]
 |  mem-estimate=4.69KB mem-reservation=0B thread-reservation=0
@@ -3144,7 +3144,7 @@ Per-Instance Resources: mem-estimate=109.52MB 
mem-reservation=85.12MB thread-res
 |  Class 4
 |    output: sum:merge(sales), sum:merge(number_sales)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=98.00MB mem-reservation=76.62MB thread-reservation=0
+|  mem-estimate=64.00MB mem-reservation=39.81MB thread-reservation=0
 |  tuple-ids=104N,105N,106N,107N,108N row-size=240B cardinality=562.30K
 |  in pipelines: 215(GETNEXT), 151(OPEN), 179(OPEN), 207(OPEN)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q18.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q18.test
index a2502d510..a189fb254 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q18.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q18.test
@@ -206,8 +206,8 @@ PLAN-ROOT SINK
    tuple-ids=2 row-size=4B cardinality=1.92M
    in pipelines: 02(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=194.25MB Threads=18
-Per-Host Resource Estimates: Memory=888MB
+Max Per-Host Resource Reservation: Memory=166.00MB Threads=18
+Per-Host Resource Estimates: Memory=874MB
 F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.03MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -222,7 +222,7 @@ PLAN-ROOT SINK
 |  in pipelines: 15(GETNEXT)
 |
 F09:PLAN FRAGMENT [HASH(CASE valid_tid(7,9,11,13,15) WHEN 7 THEN 
murmur_hash(i_item_id) WHEN 9 THEN murmur_hash(i_item_id) WHEN 11 THEN 
murmur_hash(i_item_id) WHEN 13 THEN murmur_hash(i_item_id) WHEN 15 THEN 
murmur_hash(NULL) END,CASE valid_tid(7,9,11,13,15) WHEN 7 THEN 
murmur_hash(ca_country) WHEN 9 THEN murmur_hash(ca_country) WHEN 11 THEN 
murmur_hash(ca_country) WHEN 13 THEN murmur_hash(NULL) WHEN 15 THEN 
murmur_hash(NULL) END,CASE valid_tid(7,9,11,13,15) WHEN 7 THEN 
murmur_hash(ca_st [...]
-Per-Host Resources: mem-estimate=75.94MB mem-reservation=57.69MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=61.94MB mem-reservation=29.44MB 
thread-reservation=1
 15:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(16,8,10,12,14) WHEN 8 THEN ca_country WHEN 10 THEN 
ca_country WHEN 12 THEN ca_country WHEN 14 THEN NULL WHEN 16 THEN NULL END ASC, 
CASE valid_tid(16,8,10,12,14) WHEN 8 THEN ca_state WHEN 10 THEN ca_state WHEN 
12 THEN NULL WHEN 14 THEN NULL WHEN 16 THEN NULL END ASC, CASE 
valid_tid(16,8,10,12,14) WHEN 8 THEN ca_county WHEN 10 THEN NULL WHEN 12 THEN 
NULL WHEN 14 THEN NULL WHEN 16 THEN NULL END ASC, CASE valid_tid(16,8,10,12,14) 
WHEN 8 THEN i_item_id WHEN 10 THEN [...]
 |  mem-estimate=10.16KB mem-reservation=0B thread-reservation=0
@@ -252,7 +252,7 @@ Per-Host Resources: mem-estimate=75.94MB 
mem-reservation=57.69MB thread-reservat
 |  Class 4
 |    output: avg:merge(CAST(cs_quantity AS DECIMAL(12,2))), 
avg:merge(CAST(cs_list_price AS DECIMAL(12,2))), avg:merge(CAST(cs_coupon_amt 
AS DECIMAL(12,2))), avg:merge(CAST(cs_sales_price AS DECIMAL(12,2))), 
avg:merge(CAST(cs_net_profit AS DECIMAL(12,2))), avg:merge(CAST(c_birth_year AS 
DECIMAL(12,2))), avg:merge(CAST(cd1.cd_dep_count AS DECIMAL(12,2)))
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=64.00MB mem-reservation=52.94MB thread-reservation=0
+|  mem-estimate=50.00MB mem-reservation=24.69MB thread-reservation=0
 |  tuple-ids=8N,10N,12N,14N,16N row-size=641B cardinality=75.61K
 |  in pipelines: 25(GETNEXT), 00(OPEN)
 |
@@ -471,8 +471,8 @@ Per-Host Resources: mem-estimate=399.42MB 
mem-reservation=29.62MB thread-reserva
    tuple-ids=0 row-size=40B cardinality=1.44M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=206.81MB Threads=17
-Per-Host Resource Estimates: Memory=384MB
+Max Per-Host Resource Reservation: Memory=178.56MB Threads=17
+Per-Host Resource Estimates: Memory=370MB
 F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=4.03MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -487,7 +487,7 @@ PLAN-ROOT SINK
 |  in pipelines: 15(GETNEXT)
 |
 F09:PLAN FRAGMENT [HASH(CASE valid_tid(7,9,11,13,15) WHEN 7 THEN 
murmur_hash(i_item_id) WHEN 9 THEN murmur_hash(i_item_id) WHEN 11 THEN 
murmur_hash(i_item_id) WHEN 13 THEN murmur_hash(i_item_id) WHEN 15 THEN 
murmur_hash(NULL) END,CASE valid_tid(7,9,11,13,15) WHEN 7 THEN 
murmur_hash(ca_country) WHEN 9 THEN murmur_hash(ca_country) WHEN 11 THEN 
murmur_hash(ca_country) WHEN 13 THEN murmur_hash(NULL) WHEN 15 THEN 
murmur_hash(NULL) END,CASE valid_tid(7,9,11,13,15) WHEN 7 THEN 
murmur_hash(ca_st [...]
-Per-Instance Resources: mem-estimate=75.94MB mem-reservation=57.69MB 
thread-reservation=1
+Per-Instance Resources: mem-estimate=61.94MB mem-reservation=29.44MB 
thread-reservation=1
 15:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(16,8,10,12,14) WHEN 8 THEN ca_country WHEN 10 THEN 
ca_country WHEN 12 THEN ca_country WHEN 14 THEN NULL WHEN 16 THEN NULL END ASC, 
CASE valid_tid(16,8,10,12,14) WHEN 8 THEN ca_state WHEN 10 THEN ca_state WHEN 
12 THEN NULL WHEN 14 THEN NULL WHEN 16 THEN NULL END ASC, CASE 
valid_tid(16,8,10,12,14) WHEN 8 THEN ca_county WHEN 10 THEN NULL WHEN 12 THEN 
NULL WHEN 14 THEN NULL WHEN 16 THEN NULL END ASC, CASE valid_tid(16,8,10,12,14) 
WHEN 8 THEN i_item_id WHEN 10 THEN [...]
 |  mem-estimate=10.16KB mem-reservation=0B thread-reservation=0
@@ -517,7 +517,7 @@ Per-Instance Resources: mem-estimate=75.94MB 
mem-reservation=57.69MB thread-rese
 |  Class 4
 |    output: avg:merge(CAST(cs_quantity AS DECIMAL(12,2))), 
avg:merge(CAST(cs_list_price AS DECIMAL(12,2))), avg:merge(CAST(cs_coupon_amt 
AS DECIMAL(12,2))), avg:merge(CAST(cs_sales_price AS DECIMAL(12,2))), 
avg:merge(CAST(cs_net_profit AS DECIMAL(12,2))), avg:merge(CAST(c_birth_year AS 
DECIMAL(12,2))), avg:merge(CAST(cd1.cd_dep_count AS DECIMAL(12,2)))
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=64.00MB mem-reservation=52.94MB thread-reservation=0
+|  mem-estimate=50.00MB mem-reservation=24.69MB thread-reservation=0
 |  tuple-ids=8N,10N,12N,14N,16N row-size=641B cardinality=75.61K
 |  in pipelines: 25(GETNEXT), 00(OPEN)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
index cd555434a..1288f9826 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
@@ -128,8 +128,8 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=20B cardinality=11.74M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=292.57MB Threads=10
-Per-Host Resource Estimates: Memory=26.12GB
+Max Per-Host Resource Reservation: Memory=288.82MB Threads=10
+Per-Host Resource Estimates: Memory=13.96GB
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -144,7 +144,7 @@ PLAN-ROOT SINK
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN 
murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN 
murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 
THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN 
murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN 
murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN 
murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash [...]
-Per-Host Resources: mem-estimate=20.49GB mem-reservation=146.44MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=8.33GB mem-reservation=142.69MB 
thread-reservation=1
 09:TOP-N [LIMIT=100]
 |  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE 
valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN 
avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN 
avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE 
valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name 
WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END 
ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
 |  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
@@ -174,7 +174,7 @@ Per-Host Resources: mem-estimate=20.49GB 
mem-reservation=146.44MB thread-reserva
 |  Class 4
 |    output: avg:merge(inv_quantity_on_hand)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=19.30GB mem-reservation=112.44MB thread-reservation=0
+|  mem-estimate=7.14GB mem-reservation=108.69MB thread-reservation=0
 |  tuple-ids=5N,7N,9N,11N,13N row-size=422B cardinality=35.25M
 |  in pipelines: 14(GETNEXT), 00(OPEN)
 |
@@ -294,8 +294,8 @@ Per-Host Resources: mem-estimate=5.50GB 
mem-reservation=140.62MB thread-reservat
    tuple-ids=0 row-size=20B cardinality=11.74M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=304.20MB Threads=9
-Per-Host Resource Estimates: Memory=25.96GB
+Max Per-Host Resource Reservation: Memory=300.45MB Threads=9
+Per-Host Resource Estimates: Memory=13.80GB
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -310,7 +310,7 @@ PLAN-ROOT SINK
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN 
murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN 
murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 
THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN 
murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN 
murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN 
murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash [...]
-Per-Instance Resources: mem-estimate=20.49GB mem-reservation=146.44MB 
thread-reservation=1
+Per-Instance Resources: mem-estimate=8.33GB mem-reservation=142.69MB 
thread-reservation=1
 09:TOP-N [LIMIT=100]
 |  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE 
valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN 
avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN 
avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE 
valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name 
WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END 
ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
 |  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
@@ -340,7 +340,7 @@ Per-Instance Resources: mem-estimate=20.49GB 
mem-reservation=146.44MB thread-res
 |  Class 4
 |    output: avg:merge(inv_quantity_on_hand)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=19.30GB mem-reservation=112.44MB thread-reservation=0
+|  mem-estimate=7.14GB mem-reservation=108.69MB thread-reservation=0
 |  tuple-ids=5N,7N,9N,11N,13N row-size=422B cardinality=35.25M
 |  in pipelines: 14(GETNEXT), 00(OPEN)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q27.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q27.test
index bcde921c2..5a75e5dc9 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q27.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q27.test
@@ -148,7 +148,7 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=36B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=52.95MB Threads=12
+Max Per-Host Resource Reservation: Memory=51.08MB Threads=12
 Per-Host Resource Estimates: Memory=370MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
@@ -164,7 +164,7 @@ PLAN-ROOT SINK
 |  in pipelines: 11(GETNEXT)
 |
 F05:PLAN FRAGMENT [HASH(CASE valid_tid(5,7,9) WHEN 5 THEN 
murmur_hash(i_item_id) WHEN 7 THEN murmur_hash(i_item_id) WHEN 9 THEN 
murmur_hash(NULL) END,CASE valid_tid(5,7,9) WHEN 5 THEN murmur_hash(s_state) 
WHEN 7 THEN murmur_hash(NULL) WHEN 9 THEN murmur_hash(NULL) END)] hosts=3 
instances=3
-Per-Host Resources: mem-estimate=40.00MB mem-reservation=9.62MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=40.00MB mem-reservation=7.75MB 
thread-reservation=1
 11:TOP-N [LIMIT=100]
 |  order by: CASE valid_tid(6,8,10) WHEN 6 THEN i_item_id WHEN 8 THEN 
i_item_id WHEN 10 THEN NULL END ASC, CASE valid_tid(6,8,10) WHEN 6 THEN s_state 
WHEN 8 THEN NULL WHEN 10 THEN NULL END ASC
 |  mem-estimate=5.57KB mem-reservation=0B thread-reservation=0
@@ -188,7 +188,7 @@ Per-Host Resources: mem-estimate=40.00MB 
mem-reservation=9.62MB thread-reservati
 |  Class 2
 |    output: avg:merge(ss_quantity), avg:merge(ss_list_price), 
avg:merge(ss_coupon_amt), avg:merge(ss_sales_price)
 |    group by: NULL, NULL
-|  mem-estimate=30.00MB mem-reservation=7.69MB thread-reservation=0
+|  mem-estimate=30.00MB mem-reservation=5.81MB thread-reservation=0
 |  tuple-ids=6N,8N,10N row-size=202B cardinality=17.71K
 |  in pipelines: 17(GETNEXT), 00(OPEN)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
index 4697fdd99..5de0877a8 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
@@ -186,8 +186,8 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=24B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=478.39MB Threads=11
-Per-Host Resource Estimates: Memory=31.93GB
+Max Per-Host Resource Reservation: Memory=477.45MB Threads=11
+Per-Host Resource Estimates: Memory=10.45GB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.03MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -237,7 +237,7 @@ Per-Host Resources: mem-estimate=4.08MB 
mem-reservation=4.00MB thread-reservatio
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN 
murmur_hash(i_category) WHEN 5 THEN murmur_hash(i_category) WHEN 6 THEN 
murmur_hash(i_category) WHEN 7 THEN murmur_hash(i_category) WHEN 8 THEN 
murmur_hash(i_category) WHEN 9 THEN murmur_hash(i_category) WHEN 10 THEN 
murmur_hash(i_category) WHEN 11 THEN murmur_hash(i_category) WHEN 12 THEN 
murmur_hash(NULL) END,CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN 
murmur_hash(i_class) WHEN 5 THEN murmur_hash(i_class) WHE [...]
-Per-Host Resources: mem-estimate=27.19GB mem-reservation=244.75MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=5.71GB mem-reservation=243.81MB 
thread-reservation=1
 09:TOP-N
 |  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 
5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN 
i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN 
i_category WHEN 12 THEN NULL END ASC NULLS LAST, 
aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE 
valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * 
ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quan [...]
 |  limit with ties: 200
@@ -281,7 +281,7 @@ Per-Host Resources: mem-estimate=27.19GB 
mem-reservation=244.75MB thread-reserva
 |  Class 8
 |    output: sum:merge(coalesce(ss_sales_price * ss_quantity, 0))
 |    group by: NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
-|  mem-estimate=26.71GB mem-reservation=210.75MB thread-reservation=0
+|  mem-estimate=5.22GB mem-reservation=209.81MB thread-reservation=0
 |  tuple-ids=4N,5N,6N,7N,8N,9N,10N,11N,12N row-size=1.07KB cardinality=15.09M
 |  in pipelines: 17(GETNEXT), 00(OPEN)
 |
@@ -414,8 +414,8 @@ Per-Host Resources: mem-estimate=4.54GB 
mem-reservation=223.62MB thread-reservat
    tuple-ids=0 row-size=24B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=950.77MB Threads=13
-Per-Host Resource Estimates: Memory=32.62GB
+Max Per-Host Resource Reservation: Memory=948.89MB Threads=13
+Per-Host Resource Estimates: Memory=10.43GB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=4.06MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
@@ -465,7 +465,7 @@ Per-Instance Resources: mem-estimate=4.13MB 
mem-reservation=4.00MB thread-reserv
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN 
murmur_hash(i_category) WHEN 5 THEN murmur_hash(i_category) WHEN 6 THEN 
murmur_hash(i_category) WHEN 7 THEN murmur_hash(i_category) WHEN 8 THEN 
murmur_hash(i_category) WHEN 9 THEN murmur_hash(i_category) WHEN 10 THEN 
murmur_hash(i_category) WHEN 11 THEN murmur_hash(i_category) WHEN 12 THEN 
murmur_hash(NULL) END,CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN 
murmur_hash(i_class) WHEN 5 THEN murmur_hash(i_class) WHE [...]
-Per-Instance Resources: mem-estimate=13.96GB mem-reservation=244.75MB 
thread-reservation=1
+Per-Instance Resources: mem-estimate=2.87GB mem-reservation=243.81MB 
thread-reservation=1
 09:TOP-N
 |  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 
5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN 
i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN 
i_category WHEN 12 THEN NULL END ASC NULLS LAST, 
aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE 
valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * 
ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quan [...]
 |  limit with ties: 200
@@ -509,7 +509,7 @@ Per-Instance Resources: mem-estimate=13.96GB 
mem-reservation=244.75MB thread-res
 |  Class 8
 |    output: sum:merge(coalesce(ss_sales_price * ss_quantity, 0))
 |    group by: NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
-|  mem-estimate=13.72GB mem-reservation=210.75MB thread-reservation=0
+|  mem-estimate=2.63GB mem-reservation=209.81MB thread-reservation=0
 |  tuple-ids=4N,5N,6N,7N,8N,9N,10N,11N,12N row-size=1.07KB cardinality=15.09M
 |  in pipelines: 17(GETNEXT), 00(OPEN)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q98.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q98.test
index 0d232e71e..c1d471054 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q98.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q98.test
@@ -133,7 +133,7 @@ PLAN-ROOT SINK
 |  in pipelines: 08(GETNEXT)
 |
 F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=116.13MB mem-reservation=50.00MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=116.14MB mem-reservation=50.00MB 
thread-reservation=1
 08:TOP-N [LIMIT=1000]
 |  order by: i_category ASC, i_class ASC, i_item_id ASC, i_item_desc ASC, 
sum(ss_ext_sales_price) * 100 / sum(sum(ss_ext_sales_price)) ASC
 |  mem-estimate=209.09KB mem-reservation=0B thread-reservation=0


Reply via email to