Repository: incubator-impala
Updated Branches:
  refs/heads/master 8a77fff9a -> dbb0d863b


IMPALA-5160: adjust spill buffer size based on planner estimates

Scale down the buffer size in hash joins and hash aggregations if
estimates indicate that the build side of the join is small.
This greatly reduces minimum memory requirements for joins in some
common cases, e.g. small dimension tables.

Currently this is not plumbed through to the backend and only takes
effect in planner tests.

Testing:
Added targeted planner tests for small/mid/large/unknown memory
requirements for aggregations and joins.

Change-Id: I57b5b4c528325d478c8a9b834a6bc5dedab54b5b
Reviewed-on: http://gerrit.cloudera.org:8080/6963
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dbb0d863
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dbb0d863
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dbb0d863

Branch: refs/heads/master
Commit: dbb0d863beeff16241fae87c4de3ae4bc5ab85be
Parents: 8a77fff
Author: Tim Armstrong <[email protected]>
Authored: Mon Apr 17 17:14:55 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Jun 27 20:30:34 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/common/RuntimeEnv.java    |  17 +-
 .../apache/impala/planner/AggregationNode.java  |  43 +-
 .../apache/impala/planner/AnalyticEvalNode.java |   2 +-
 .../org/apache/impala/planner/HashJoinNode.java |  42 +-
 .../org/apache/impala/planner/PlanNode.java     |  14 +-
 .../org/apache/impala/planner/SortNode.java     |   2 +-
 .../java/org/apache/impala/util/BitUtil.java    |  32 +
 .../org/apache/impala/planner/PlannerTest.java  |  12 +
 .../apache/impala/planner/PlannerTestBase.java  |  15 +-
 .../PlannerTest/spillable-buffer-sizing.test    | 842 +++++++++++++++++++
 10 files changed, 983 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java 
b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
index bfb079e..2041090 100644
--- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
+++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
@@ -17,10 +17,7 @@
 
 package org.apache.impala.common;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.impala.service.FeSupport;
+import org.apache.impala.service.BackendConfig;
 
 /**
  * Contains runtime-specific parameters such as the number of CPU cores. 
Currently only
@@ -32,6 +29,13 @@ public class RuntimeEnv {
 
   private int numCores_;
 
+  // The minimum size of buffer spilled to disk by spilling nodes. Used in
+  // PlanNode.computeResourceProfile(). Currently the backend only support a 
single
+  // spillable buffer size, so this is equal to 
PlanNode.DEFAULT_SPILLABLE_BUFFER_BYTES,
+  // except in planner tests.
+  // TODO: IMPALA-3200: this get from query option
+  private long minSpillableBufferBytes_;
+
   // Indicates whether this is an environment for testing.
   private boolean isTestEnv_;
 
@@ -44,10 +48,15 @@ public class RuntimeEnv {
    */
   public void reset() {
     numCores_ = Runtime.getRuntime().availableProcessors();
+    minSpillableBufferBytes_ = BackendConfig.INSTANCE.getReadSize();
   }
 
   public int getNumCores() { return numCores_; }
   public void setNumCores(int numCores) { this.numCores_ = numCores; }
+  public long getMinSpillableBufferBytes() { return minSpillableBufferBytes_; }
+  public void setMinSpillableBufferBytes(long minSpillableBufferBytes) {
+    minSpillableBufferBytes_ = minSpillableBufferBytes;
+  }
   public void setTestEnv(boolean v) { isTestEnv_ = v; }
   public boolean isTestEnv() { return isTestEnv_; }
   public boolean isKuduSupported() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 5cfce82..d1b7419 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -30,12 +30,15 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TAggregationNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.BitUtil;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -281,6 +284,22 @@ public class AggregationNode extends PlanNode {
   public void computeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(
         fragment_, "PlanNode must be placed into a fragment before calling 
this method.");
+    long perInstanceCardinality = fragment_.getPerInstanceNdv(
+        queryOptions.getMt_dop(), aggInfo_.getGroupingExprs());
+    long perInstanceMemEstimate;
+    long perInstanceDataBytes = -1;
+    if (perInstanceCardinality == -1) {
+      perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
+    } else {
+      // Per-instance cardinality cannot be greater than the total output 
cardinality.
+      if (cardinality_ != -1) {
+        perInstanceCardinality = Math.min(perInstanceCardinality, 
cardinality_);
+      }
+      perInstanceDataBytes = (long)Math.ceil(perInstanceCardinality * 
avgRowSize_);
+      perInstanceMemEstimate = (long)Math.max(perInstanceDataBytes *
+          PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
+    }
+
     // Must be kept in sync with 
PartitionedAggregationNode::MinRequiredBuffers() in be.
     long perInstanceMinBuffers;
     if (aggInfo_.getGroupingExprs().isEmpty() || useStreamingPreagg_) {
@@ -288,22 +307,18 @@ public class AggregationNode extends PlanNode {
     } else {
       final int PARTITION_FANOUT = 16;
       long minBuffers = 2 * PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() 
? 1 : 0);
-      perInstanceMinBuffers = SPILLABLE_BUFFER_BYTES * minBuffers;
+      long bufferSize = getDefaultSpillableBufferBytes();
+      if (perInstanceDataBytes != -1) {
+        long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT;
+        // Scale down the buffer size if we think there will be excess free 
space with the
+        // default buffer size, e.g. with small dimension tables.
+        bufferSize = Math.min(bufferSize, Math.max(
+            RuntimeEnv.INSTANCE.getMinSpillableBufferBytes(),
+            BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
+      }
+      perInstanceMinBuffers = bufferSize * minBuffers;
     }
 
-    long perInstanceCardinality = fragment_.getPerInstanceNdv(
-        queryOptions.getMt_dop(), aggInfo_.getGroupingExprs());
-    if (perInstanceCardinality == -1) {
-      resourceProfile_ =
-          new ResourceProfile(DEFAULT_PER_INSTANCE_MEM, perInstanceMinBuffers);
-      return;
-    }
-    // Per-instance cardinality cannot be greater than the total output 
cardinality.
-    if (cardinality_ != -1) {
-      perInstanceCardinality = Math.min(perInstanceCardinality, cardinality_);
-    }
-    long perInstanceMemEstimate = (long)Math.max(perInstanceCardinality * 
avgRowSize_ *
-        PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
     resourceProfile_ =
         new ResourceProfile(perInstanceMemEstimate, perInstanceMinBuffers);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java 
b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index e0981c7..f17226b 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -249,7 +249,7 @@ public class AnalyticEvalNode extends PlanNode {
     long perInstanceMemEstimate = 0;
 
     // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in 
be.
-    long perInstanceMinBufferBytes = 2 * SPILLABLE_BUFFER_BYTES;
+    long perInstanceMinBufferBytes = 2 * getDefaultSpillableBufferBytes();
     resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinBufferBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index f819513..030f9c5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -28,12 +28,15 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TEqJoinCondition;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.THashJoinNode;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.BitUtil;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -178,23 +181,42 @@ public class HashJoinNode extends JoinNode {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    // Must be kept in sync with 
PartitionedHashJoinBuilder::MinRequiredBuffers() in be.
-    final int PARTITION_FANOUT = 16;
-    long minBuffers = PARTITION_FANOUT + 1
-        + (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0);
-    long perInstanceMinBufferBytes = SPILLABLE_BUFFER_BYTES * minBuffers;
-
     long perInstanceMemEstimate;
+    long perInstanceDataBytes;
+    int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
     if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
-        || numNodes_ == 0) {
+        || numInstances <= 0) {
       perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
+      perInstanceDataBytes = -1;
     } else {
-      perInstanceMemEstimate = (long) Math.ceil(getChild(1).cardinality_
-          * getChild(1).avgRowSize_ * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
+      perInstanceDataBytes = (long) Math.ceil(getChild(1).cardinality_
+          * getChild(1).avgRowSize_);
+      // Assume the rows are evenly divided among instances.
+      // TODO-MT: this estimate is not quite right with parallel plans. Fix it 
before
+      // we allow executing parallel plans with joins.
       if (distrMode_ == DistributionMode.PARTITIONED) {
-        perInstanceMemEstimate /= 
fragment_.getNumInstances(queryOptions.getMt_dop());
+        perInstanceDataBytes /= numInstances;
       }
+      perInstanceMemEstimate = (long) Math.ceil(
+          perInstanceDataBytes * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
+    }
+
+    // Must be kept in sync with 
PartitionedHashJoinBuilder::MinRequiredBuffers() in be.
+    final int PARTITION_FANOUT = 16;
+    long minBuffers = PARTITION_FANOUT + 1
+        + (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0);
+
+    long bufferSize = getDefaultSpillableBufferBytes();
+    if (perInstanceDataBytes != -1) {
+      long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT;
+      // Scale down the buffer size if we think there will be excess free 
space with the
+      // default buffer size, e.g. if the right side is a small dimension 
table.
+      bufferSize = Math.min(bufferSize, Math.max(
+          RuntimeEnv.INSTANCE.getMinSpillableBufferBytes(),
+          BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
     }
+
+    long perInstanceMinBufferBytes = bufferSize * minBuffers;
     resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinBufferBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 67d9e44..0b36922 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -33,6 +33,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExecStats;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlan;
@@ -65,10 +66,6 @@ import com.google.common.math.LongMath;
 abstract public class PlanNode extends TreeNode<PlanNode> {
   private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class);
 
-  // The size of buffer used in spilling nodes. Used in 
computeResourceProfile().
-  // TODO: IMPALA-3200: get from query option
-  protected final static long SPILLABLE_BUFFER_BYTES = 8L * 1024L * 1024L;
-
   // String used for this node in getExplainString().
   protected String displayName_;
 
@@ -627,6 +624,15 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public abstract void computeResourceProfile(TQueryOptions queryOptions);
 
   /**
+   * The default size of buffer used in spilling nodes. Used in 
computeResourceProfile().
+   */
+  protected final static long getDefaultSpillableBufferBytes() {
+    // BufferedBlockMgr uses --read_size to determine buffer size.
+    // TODO: IMPALA-3200: get from query option
+    return BackendConfig.INSTANCE.getReadSize();
+  }
+
+  /**
    * The input cardinality is the sum of output cardinalities of its children.
    * For scan nodes the input cardinality is the expected number of rows 
scanned.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index c06d320..3517bee 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -249,7 +249,7 @@ public class SortNode extends PlanNode {
     long perInstanceMemEstimate = blockSize * (long) 
Math.ceil(Math.sqrt(numInputBlocks));
 
     // Must be kept in sync with min_buffers_required in Sorter in be.
-    long perInstanceMinReservation = 3 * SPILLABLE_BUFFER_BYTES;
+    long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes();
     if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
       perInstanceMinReservation *= 2;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/main/java/org/apache/impala/util/BitUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/BitUtil.java 
b/fe/src/main/java/org/apache/impala/util/BitUtil.java
new file mode 100644
index 0000000..51b778a
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/BitUtil.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+public class BitUtil {
+
+  // Returns the log2 of 'val'. 'val' must be > 0.
+  public static int log2Ceiling(long val) {
+    // Formula is based on the Long.numberOfLeadingZeros() javadoc comment.
+    return 64 - Long.numberOfLeadingZeros(val - 1);
+  }
+
+  // Round up 'val' to the nearest power of two. 'val' must be > 0.
+  public static int roundUpToPowerOf2(long val) {
+    return 1 << log2Ceiling(val);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 270df42..62c8d0d 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -401,6 +401,18 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testSpillableBufferSizing() {
+    // Tests the resource requirement computation from the planner when it is 
allowed to
+    // vary the spillable buffer size.
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    options.setNum_scanner_threads(1); // Required so that output doesn't vary 
by machine
+    // TODO: IMPALA-3200 - this should become a query option.
+    RuntimeEnv.INSTANCE.setMinSpillableBufferBytes(64 * 1024);
+    runPlannerTestFile("spillable-buffer-sizing", options, false);
+  }
+
+  @Test
   public void testSortExprMaterialization() {
     addTestFunction("TestFn", Lists.newArrayList(Type.DOUBLE), false);
     TQueryOptions options = defaultQueryOptions();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index eceaeca..6cd4e7a 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -73,6 +73,7 @@ import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduScanToken;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,10 +99,6 @@ public class PlannerTestBase extends FrontendTestBase {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    // Use 8 cores for resource estimation.
-    RuntimeEnv.INSTANCE.setNumCores(8);
-    // Set test env to control the explain level.
-    RuntimeEnv.INSTANCE.setTestEnv(true);
     // Mimic the 3 node test mini-cluster.
     TUpdateMembershipRequest updateReq = new TUpdateMembershipRequest();
     updateReq.setIp_addresses(Sets.newHashSet("127.0.0.1"));
@@ -114,6 +111,16 @@ public class PlannerTestBase extends FrontendTestBase {
     }
   }
 
+  @Before
+  public void setUpTest() throws Exception {
+    // Reset the RuntimeEnv - individual tests may change it.
+    RuntimeEnv.INSTANCE.reset();
+    // Use 8 cores for resource estimation.
+    RuntimeEnv.INSTANCE.setNumCores(8);
+    // Set test env to control the explain level.
+    RuntimeEnv.INSTANCE.setTestEnv(true);
+  }
+
   @AfterClass
   public static void cleanUp() throws Exception {
     RuntimeEnv.INSTANCE.reset();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dbb0d863/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
new file mode 100644
index 0000000..3aa2cba
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -0,0 +1,842 @@
+# Join with tiny build side - should use smallest possible buffers.
+select straight_join *
+from tpch_parquet.customer
+    inner join tpch_parquet.nation on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=1.06MB
+Per-Host Resource Estimates: Memory=24.00MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=355B cardinality=150000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  mem-estimate=3.15KB mem-reservation=1.06MB
+|  tuple-ids=0,1 row-size=355B cardinality=150000
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=117B cardinality=25
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  01:SCAN HDFS [tpch_parquet.nation, RANDOM]
+|     partitions=1/1 files=1 size=2.94KB
+|     stats-rows=25 extrapolated-rows=disabled
+|     table stats: rows=25 size=2.94KB
+|     column stats: all
+|     mem-estimate=16.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=117B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer, RANDOM]
+   partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=12.34MB
+   column stats: all
+   mem-estimate=24.00MB mem-reservation=0B
+   tuple-ids=0 row-size=238B cardinality=150000
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=2.12MB
+Per-Host Resource Estimates: Memory=80.01MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=355B cardinality=150000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  mem-estimate=3.15KB mem-reservation=1.06MB
+|  tuple-ids=0,1 row-size=355B cardinality=150000
+|
+|--F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: n_nationkey
+|  |  mem-estimate=0B mem-reservation=0B
+|  |
+|  03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=117B cardinality=25
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+|  01:SCAN HDFS [tpch_parquet.nation, RANDOM]
+|     partitions=1/1 files=1 size=2.94KB
+|     stats-rows=25 extrapolated-rows=disabled
+|     table stats: rows=25 size=2.94KB
+|     column stats: all
+|     mem-estimate=16.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=117B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer, RANDOM]
+   partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=12.34MB
+   column stats: all
+   mem-estimate=24.00MB mem-reservation=0B
+   tuple-ids=0 row-size=238B cardinality=150000
+====
+# Join with large build side - should use default-sized buffers.
+select straight_join *
+from tpch_parquet.lineitem
+    left join tpch_parquet.orders on l_orderkey = o_orderkey
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=136.00MB
+Per-Host Resource Estimates: Memory=380.41MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1N row-size=454B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  mem-estimate=300.41MB mem-reservation=136.00MB
+|  tuple-ids=0,1N row-size=454B cardinality=6001215
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=191B cardinality=1500000
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=272.00MB
+Per-Host Resource Estimates: Memory=840.83MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1N row-size=454B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: l_orderkey = o_orderkey
+|  mem-estimate=300.41MB mem-reservation=136.00MB
+|  tuple-ids=0,1N row-size=454B cardinality=6001215
+|
+|--F03:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |  mem-estimate=0B mem-reservation=0B
+|  |
+|  03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=191B cardinality=1500000
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+====
+# Shuffle join with mid-sized input.
+select straight_join *
+from tpch_parquet.orders
+    join /*+shuffle*/ tpch_parquet.customer on o_custkey = c_custkey
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=34.00MB
+Per-Host Resource Estimates: Memory=58.69MB
+
+F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+05:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+F02:PLAN FRAGMENT [HASH(o_custkey)] hosts=2 instances=2
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF000 <- c_custkey
+|  mem-estimate=18.69MB mem-reservation=34.00MB
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+|--04:EXCHANGE [HASH(c_custkey)]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=238B cardinality=150000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  01:SCAN HDFS [tpch_parquet.customer, RANDOM]
+|     partitions=1/1 files=1 size=12.34MB
+|     stats-rows=150000 extrapolated-rows=disabled
+|     table stats: rows=150000 size=12.34MB
+|     column stats: all
+|     mem-estimate=24.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=238B cardinality=150000
+|
+03:EXCHANGE [HASH(o_custkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=191B cardinality=1500000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+00:SCAN HDFS [tpch_parquet.orders, RANDOM]
+   partitions=1/1 files=2 size=54.20MB
+   runtime filters: RF000 -> o_custkey
+   stats-rows=1500000 extrapolated-rows=disabled
+   table stats: rows=1500000 size=54.20MB
+   column stats: all
+   mem-estimate=40.00MB mem-reservation=0B
+   tuple-ids=0 row-size=191B cardinality=1500000
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=34.00MB
+Per-Host Resource Estimates: Memory=146.69MB
+
+F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+05:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+F02:PLAN FRAGMENT [HASH(o_custkey)] hosts=2 instances=4
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash-table-id=00
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF000 <- c_custkey
+|  mem-estimate=9.35MB mem-reservation=17.00MB
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+|--F04:PLAN FRAGMENT [HASH(o_custkey)] hosts=1 instances=2
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: c_custkey
+|  |  mem-estimate=0B mem-reservation=0B
+|  |
+|  04:EXCHANGE [HASH(c_custkey)]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=238B cardinality=150000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+|  01:SCAN HDFS [tpch_parquet.customer, RANDOM]
+|     partitions=1/1 files=1 size=12.34MB
+|     stats-rows=150000 extrapolated-rows=disabled
+|     table stats: rows=150000 size=12.34MB
+|     column stats: all
+|     mem-estimate=24.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=238B cardinality=150000
+|
+03:EXCHANGE [HASH(o_custkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=191B cardinality=1500000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
+00:SCAN HDFS [tpch_parquet.orders, RANDOM]
+   partitions=1/1 files=2 size=54.20MB
+   runtime filters: RF000 -> o_custkey
+   stats-rows=1500000 extrapolated-rows=disabled
+   table stats: rows=1500000 size=54.20MB
+   column stats: all
+   mem-estimate=40.00MB mem-reservation=0B
+   tuple-ids=0 row-size=191B cardinality=1500000
+====
+# Broadcast join with mid-sized input - should use larger buffers than shuffle 
join.
+select straight_join *
+from tpch_parquet.orders
+    join /*+broadcast*/ tpch_parquet.customer on o_custkey = c_custkey
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=68.00MB
+Per-Host Resource Estimates: Memory=77.38MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF000 <- c_custkey
+|  mem-estimate=37.38MB mem-reservation=68.00MB
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=238B cardinality=150000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  01:SCAN HDFS [tpch_parquet.customer, RANDOM]
+|     partitions=1/1 files=1 size=12.34MB
+|     stats-rows=150000 extrapolated-rows=disabled
+|     table stats: rows=150000 size=12.34MB
+|     column stats: all
+|     mem-estimate=24.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=238B cardinality=150000
+|
+00:SCAN HDFS [tpch_parquet.orders, RANDOM]
+   partitions=1/1 files=2 size=54.20MB
+   runtime filters: RF000 -> o_custkey
+   stats-rows=1500000 extrapolated-rows=disabled
+   table stats: rows=1500000 size=54.20MB
+   column stats: all
+   mem-estimate=40.00MB mem-reservation=0B
+   tuple-ids=0 row-size=191B cardinality=1500000
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=136.00MB
+Per-Host Resource Estimates: Memory=202.76MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF000 <- c_custkey
+|  mem-estimate=37.38MB mem-reservation=68.00MB
+|  tuple-ids=0,1 row-size=428B cardinality=1500000
+|
+|--F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: c_custkey
+|  |  mem-estimate=0B mem-reservation=0B
+|  |
+|  03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=238B cardinality=150000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+|  01:SCAN HDFS [tpch_parquet.customer, RANDOM]
+|     partitions=1/1 files=1 size=12.34MB
+|     stats-rows=150000 extrapolated-rows=disabled
+|     table stats: rows=150000 size=12.34MB
+|     column stats: all
+|     mem-estimate=24.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=238B cardinality=150000
+|
+00:SCAN HDFS [tpch_parquet.orders, RANDOM]
+   partitions=1/1 files=2 size=54.20MB
+   runtime filters: RF000 -> o_custkey
+   stats-rows=1500000 extrapolated-rows=disabled
+   table stats: rows=1500000 size=54.20MB
+   column stats: all
+   mem-estimate=40.00MB mem-reservation=0B
+   tuple-ids=0 row-size=191B cardinality=1500000
+====
+# Join with no stats for right input - should use default buffers.
+select straight_join *
+from functional_parquet.alltypes
+    left join functional_parquet.alltypestiny on alltypes.id = alltypestiny.id
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=136.00MB
+Per-Host Resource Estimates: Memory=2.02GB
+WARNING: The following tables are missing relevant table and/or column 
statistics.
+functional_parquet.alltypes, functional_parquet.alltypestiny
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1N row-size=176B cardinality=unavailable
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: alltypes.id = alltypestiny.id
+|  mem-estimate=2.00GB mem-reservation=136.00MB
+|  tuple-ids=0,1N row-size=176B cardinality=unavailable
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=88B cardinality=unavailable
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  01:SCAN HDFS [functional_parquet.alltypestiny, RANDOM]
+|     partitions=4/4 files=4 size=10.48KB
+|     stats-rows=unavailable extrapolated-rows=disabled
+|     table stats: rows=unavailable size=unavailable
+|     column stats: unavailable
+|     mem-estimate=16.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=88B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   partitions=24/24 files=24 size=178.13KB
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
+   columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, 
bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   mem-estimate=16.00MB mem-reservation=0B
+   tuple-ids=0 row-size=88B cardinality=unavailable
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=272.00MB
+Per-Host Resource Estimates: Memory=4.06GB
+WARNING: The following tables are missing relevant table and/or column 
statistics.
+functional_parquet.alltypestiny
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1N row-size=176B cardinality=unavailable
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: alltypes.id = alltypestiny.id
+|  mem-estimate=2.00GB mem-reservation=136.00MB
+|  tuple-ids=0,1N row-size=176B cardinality=unavailable
+|
+|--F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: alltypestiny.id
+|  |  mem-estimate=0B mem-reservation=0B
+|  |
+|  03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=88B cardinality=unavailable
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+|  01:SCAN HDFS [functional_parquet.alltypestiny, RANDOM]
+|     partitions=4/4 files=4 size=10.48KB
+|     stats-rows=unavailable extrapolated-rows=disabled
+|     table stats: rows=unavailable size=unavailable
+|     column stats: unavailable
+|     mem-estimate=16.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=88B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   partitions=24/24 files=24 size=178.13KB
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
+   columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, 
bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   mem-estimate=16.00MB mem-reservation=0B
+   tuple-ids=0 row-size=88B cardinality=unavailable
+====
+# Low NDV aggregation - should scale down buffers to minimum.
+select c_nationkey, avg(c_acctbal)
+from tpch_parquet.customer
+group by c_nationkey
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=2.12MB
+Per-Host Resource Estimates: Memory=44.00MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=10B cardinality=25
+|
+F01:PLAN FRAGMENT [HASH(c_nationkey)] hosts=1 instances=1
+03:AGGREGATE [FINALIZE]
+|  output: avg:merge(c_acctbal)
+|  group by: c_nationkey
+|  mem-estimate=10.00MB mem-reservation=2.12MB
+|  tuple-ids=2 row-size=10B cardinality=25
+|
+02:EXCHANGE [HASH(c_nationkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=10B cardinality=25
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+01:AGGREGATE [STREAMING]
+|  output: avg(c_acctbal)
+|  group by: c_nationkey
+|  mem-estimate=10.00MB mem-reservation=0B
+|  tuple-ids=1 row-size=10B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer, RANDOM]
+   partitions=1/1 files=1 size=12.34MB
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=12.34MB
+   column stats: all
+   mem-estimate=24.00MB mem-reservation=0B
+   tuple-ids=0 row-size=10B cardinality=150000
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=4.25MB
+Per-Host Resource Estimates: Memory=88.00MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=10B cardinality=25
+|
+F01:PLAN FRAGMENT [HASH(c_nationkey)] hosts=1 instances=2
+03:AGGREGATE [FINALIZE]
+|  output: avg:merge(c_acctbal)
+|  group by: c_nationkey
+|  mem-estimate=10.00MB mem-reservation=2.12MB
+|  tuple-ids=2 row-size=10B cardinality=25
+|
+02:EXCHANGE [HASH(c_nationkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=10B cardinality=25
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
+01:AGGREGATE [STREAMING]
+|  output: avg(c_acctbal)
+|  group by: c_nationkey
+|  mem-estimate=10.00MB mem-reservation=0B
+|  tuple-ids=1 row-size=10B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer, RANDOM]
+   partitions=1/1 files=1 size=12.34MB
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=12.34MB
+   column stats: all
+   mem-estimate=24.00MB mem-reservation=0B
+   tuple-ids=0 row-size=10B cardinality=150000
+====
+# Mid NDV aggregation - should scale down buffers to intermediate size.
+select straight_join l_orderkey, o_orderstatus, count(*)
+from tpch_parquet.lineitem
+    join tpch_parquet.orders on o_orderkey = l_orderkey
+group by 1, 2
+having count(*) = 1
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=83.00MB
+Per-Host Resource Estimates: Memory=165.28MB
+
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+08:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+F03:PLAN FRAGMENT [HASH(l_orderkey,o_orderstatus)] hosts=3 instances=3
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: l_orderkey, o_orderstatus
+|  having: count(*) = 1
+|  mem-estimate=18.04MB mem-reservation=66.00MB
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+06:EXCHANGE [HASH(l_orderkey,o_orderstatus)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: l_orderkey, o_orderstatus
+|  mem-estimate=54.12MB mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF000 <- o_orderkey
+|  mem-estimate=13.11MB mem-reservation=17.00MB
+|  tuple-ids=0,1 row-size=33B cardinality=5757710
+|
+|--05:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=25B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=25B cardinality=1500000
+|
+04:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=8B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   runtime filters: RF000 -> l_orderkey
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=8B cardinality=6001215
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=83.00MB
+Per-Host Resource Estimates: Memory=327.24MB
+
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+08:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+F03:PLAN FRAGMENT [HASH(l_orderkey,o_orderstatus)] hosts=3 instances=6
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: l_orderkey, o_orderstatus
+|  having: count(*) = 1
+|  mem-estimate=10.00MB mem-reservation=33.00MB
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+06:EXCHANGE [HASH(l_orderkey,o_orderstatus)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: l_orderkey, o_orderstatus
+|  mem-estimate=27.06MB mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash-table-id=00
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF000 <- o_orderkey
+|  mem-estimate=6.56MB mem-reservation=8.50MB
+|  tuple-ids=0,1 row-size=33B cardinality=5757710
+|
+|--F05:PLAN FRAGMENT [HASH(l_orderkey)] hosts=2 instances=4
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |  mem-estimate=0B mem-reservation=0B
+|  |
+|  05:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=25B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=25B cardinality=1500000
+|
+04:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=8B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   runtime filters: RF000 -> l_orderkey
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=8B cardinality=6001215
+====
+# High NDV aggregation - should use default buffer size.
+select distinct *
+from tpch_parquet.lineitem
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=264.00MB
+Per-Host Resource Estimates: Memory=3.31GB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+F01:PLAN FRAGMENT 
[HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
 hosts=3 instances=3
+03:AGGREGATE [FINALIZE]
+|  group by: tpch_parquet.lineitem.l_orderkey, 
tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, 
tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, 
tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, 
tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, 
tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, 
tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, 
tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, 
tpch_parquet.lineitem.l_comment
+|  mem-estimate=1.62GB mem-reservation=264.00MB
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+02:EXCHANGE 
[HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+01:AGGREGATE [STREAMING]
+|  group by: tpch_parquet.lineitem.l_orderkey, 
tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, 
tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, 
tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, 
tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, 
tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, 
tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, 
tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, 
tpch_parquet.lineitem.l_comment
+|  mem-estimate=1.62GB mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=528.00MB
+Per-Host Resource Estimates: Memory=6.62GB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+F01:PLAN FRAGMENT 
[HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
 hosts=3 instances=6
+03:AGGREGATE [FINALIZE]
+|  group by: tpch_parquet.lineitem.l_orderkey, 
tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, 
tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, 
tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, 
tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, 
tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, 
tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, 
tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, 
tpch_parquet.lineitem.l_comment
+|  mem-estimate=1.62GB mem-reservation=264.00MB
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+02:EXCHANGE 
[HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+01:AGGREGATE [STREAMING]
+|  group by: tpch_parquet.lineitem.l_orderkey, 
tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, 
tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, 
tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, 
tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, 
tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, 
tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, 
tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, 
tpch_parquet.lineitem.l_comment
+|  mem-estimate=1.62GB mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+====
+# Aggregation with unknown input - should use default buffer size.
+select string_col, count(*)
+from functional_parquet.alltypestiny
+group by string_col
+---- DISTRIBUTEDPLAN
+Per-Host Resource Reservation: Memory=264.00MB
+Per-Host Resource Estimates: Memory=272.00MB
+WARNING: The following tables are missing relevant table and/or column 
statistics.
+functional_parquet.alltypestiny
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+F01:PLAN FRAGMENT [HASH(string_col)] hosts=3 instances=3
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: string_col
+|  mem-estimate=128.00MB mem-reservation=264.00MB
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+02:EXCHANGE [HASH(string_col)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: string_col
+|  mem-estimate=128.00MB mem-reservation=0B
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypestiny, RANDOM]
+   partitions=4/4 files=4 size=10.48KB
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
+   column stats: unavailable
+   mem-estimate=16.00MB mem-reservation=0B
+   tuple-ids=0 row-size=16B cardinality=unavailable
+---- PARALLELPLANS
+Per-Host Resource Reservation: Memory=528.00MB
+Per-Host Resource Estimates: Memory=544.00MB
+WARNING: The following tables are missing relevant table and/or column 
statistics.
+functional_parquet.alltypestiny
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+F01:PLAN FRAGMENT [HASH(string_col)] hosts=3 instances=6
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: string_col
+|  mem-estimate=128.00MB mem-reservation=264.00MB
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+02:EXCHANGE [HASH(string_col)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: string_col
+|  mem-estimate=128.00MB mem-reservation=0B
+|  tuple-ids=1 row-size=24B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypestiny, RANDOM]
+   partitions=4/4 files=4 size=10.48KB
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=unavailable
+   column stats: unavailable
+   mem-estimate=16.00MB mem-reservation=0B
+   tuple-ids=0 row-size=16B cardinality=unavailable
+====

Reply via email to