This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bc997052520cd514543dc5b3630fedc2710d8dc2
Author: Steve Carlin <[email protected]>
AuthorDate: Fri Aug 29 08:18:02 2025 -0700

    IMPALA-13902: Calcite planner: Implement is_spool_query_results
    
    The is_spool_query_results query option is now supported in Calcite. The
    returnAtMostOneRow method is now implemented to support this.
    PlanRootSink is refactored to extract sanitizing query options (a new
    method sanitizeSpoolingOptions()) out of
    PlanRootSink.computeResourceProfile(). The bulk of memory bounding
    calculation is also extracted out to a new class SpoolingMemoryBound.
    
    Added "sleep" in ImpalaOperatorTable.java since some EE tests related to
    result spooling calls sleep() function. Changed ImpalaPlanRel to extends
    RelNode interface.
    
    A sanity test has been added to calcite.test, but the bulk of the
    testing will be done through the Impala test framework when it is
    enabled.
    
    Testing:
    - Pass FE tests PlannerTest#testResultSpooling, TpcdsCpuCostPlannerTest,
      and all java tests under calcite-planner project.
    - Pass query_test/test_result_spooling.py and
      custom_cluster/test_result_spooling.py.
    
    Co-authored-by: Riza Suminto
    
    Change-Id: I5b9bf49e2874ee12de212b892bd898c296774c6f
    Reviewed-on: http://gerrit.cloudera.org:8080/23562
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/analysis/AnalysisContext.java    |  11 -
 .../java/org/apache/impala/analysis/QueryStmt.java |  12 +-
 .../org/apache/impala/analysis/SelectStmt.java     |   5 +
 .../org/apache/impala/planner/PlanRootSink.java    | 316 ++++++++++++++-------
 .../apache/impala/planner/SingleNodePlanner.java   |   2 +-
 .../calcite/operators/ImpalaOperatorTable.java     |   1 +
 .../impala/calcite/rel/node/ImpalaPlanRel.java     |   9 +-
 .../calcite/service/CalciteSingleNodePlanner.java  |  57 +++-
 .../impala/calcite/service/ExecRequestCreator.java |   2 +-
 .../PlannerTest/calcite_tpcds/tpcds-q13.test       |  24 +-
 .../PlannerTest/calcite_tpcds/tpcds-q48.test       |  24 +-
 .../PlannerTest/calcite_tpcds/tpcds-q87.test       |  16 +-
 .../queries/PlannerTest/result-spooling.test       |   2 +-
 .../queries/QueryTest/calcite.test                 |  10 +
 14 files changed, 325 insertions(+), 166 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java 
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 8a50bbf91..222ad9986 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -497,17 +497,6 @@ public class AnalysisContext {
     authzChecker.postAnalyze(authzCtx);
     ImpalaException analysisException = analysisResult_.getException();
 
-    // A statement that returns at most one row does not need to spool query 
results.
-    // IMPALA-13902: returnsAtMostOneRow should be in planner interface so it 
is
-    // accessible by the Calcite planner.
-    if (analysisException == null && analysisResult_.getStmt() instanceof 
SelectStmt &&
-        ((SelectStmt)analysisResult_.getStmt()).returnsAtMostOneRow()) {
-      clientRequest.query_options.setSpool_query_results(false);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Result spooling is disabled due to the statement returning 
at most "
-            + "one row.");
-      }
-    }
     long durationMs = timeline_.markEvent("Analysis finished") / 1000000;
     LOG.info("Analysis took {} ms", durationMs);
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index b5a036975..29fbcac70 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -20,7 +20,6 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -29,8 +28,6 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.SqlCastException;
-import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.PlanRootSink;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -503,9 +500,12 @@ public abstract class QueryStmt extends StatementBase {
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
   }
 
-  public DataSink createDataSink(List<Expr> resultExprs) {
-    return new PlanRootSink(resultExprs);
-  }
+  /**
+   * Return True if this statement is eligible for result spooling.
+   * Planner can still decide to disable result spooling if there are other 
factors
+   * during analysis that prevents spooling from happening.
+   */
+  public boolean canSpoolResult() { return true; }
 
   public List<OrderByElement> cloneOrderByElements() {
     if (orderByElements_ == null) return null;
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index f96be1986..d152769c8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -1915,4 +1915,9 @@ public class SelectStmt extends QueryStmt {
           (sortInfo_ != null &&
            TreeNode.contains(sortInfo_.getSortExprs(), Expr.IS_AGGREGATE));
   }
+
+  @Override
+  public boolean canSpoolResult() {
+    return !returnsAtMostOneRow();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java 
b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index df4784935..27535cb7d 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -19,6 +19,7 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
+import com.google.common.base.MoreObjects;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDataSink;
@@ -45,11 +46,33 @@ public class PlanRootSink extends DataSink {
   // IMPALA-4268 for details on how this value was chosen.
   private static final long DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY = 10 * 
1024 * 1024;
 
+  /**
+   * Create a PlanRootSink. This method will sanitize spooling-related query 
options
+   * within given PlannerContext before creating the PlanRootSink. Once 
sanitized here,
+   * the spooling-related query options is safe to use directly and should not 
be modified
+   * anywhere else.
+   * @param ctx the planner context.
+   * @param outputExprs the output expressions for the PlanRootSink.
+   * @param requireSpooling hard override to disable result spooling that can 
be supplied
+   *                        by caller.
+   * @return the created PlanRootSink.
+   */
+  public static PlanRootSink create(
+      PlannerContext ctx, List<Expr> outputExprs, boolean requireSpooling) {
+    sanitizeSpoolingOptions(ctx.getQueryOptions(), requireSpooling);
+    return new PlanRootSink(outputExprs);
+  }
+
   // One expression per result column for the query.
   private final List<Expr> outputExprs_;
 
+  /**
+   * DEPRECATED.
+   * Use {@link PlanRootSink#create(PlannerContext, List, boolean)} instead.
+   * This constructor will be made private. It is only temporarily preserved 
to prevent
+   * compilation error.
+   */
   public PlanRootSink(List<Expr> outputExprs) {
-    Preconditions.checkState(outputExprs != null);
     outputExprs_ = outputExprs;
   }
 
@@ -70,8 +93,10 @@ public class PlanRootSink extends DataSink {
 
   @Override
   public void computeProcessingCost(TQueryOptions queryOptions) {
-    if (queryOptions.isSpool_query_results() && 
queryOptions.getScratch_limit() != 0
-        && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) {
+    // TODO: this sanitization should have already happened in 
PlanRootSink.create().
+    // This can be removed once PlanRootSink constructor is made private.
+    sanitizeSpoolingOptions(queryOptions, true);
+    if (queryOptions.isSpool_query_results()) {
       // The processing cost to buffer these many rows in root.
       long outputCardinality = Math.max(0, 
fragment_.getPlanRoot().getCardinality());
       processingCost_ = ProcessingCost.basicCost(
@@ -98,115 +123,136 @@ public class PlanRootSink extends DataSink {
    * the maximum between default spillable buffer size and MAX_ROW_SIZE 
(rounded up to
    * nearest power of 2) to account for the read and write pages in the
    * BufferedTupleStream used by the backend plan-root-sink. The maximum 
reservation is
-   * set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
-   *
-   * If SPOOL_QUERY_RESULTS is true but spill is disabled either due to 
SCRATCH_LIMIT = 0
-   * or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false. A 
ResourceProfile
-   * is returned with no reservation or buffer sizes, and the estimated memory 
consumption
-   * is 0.
+   * set to the query-level config MAX_SPILLED_RESULT_SPOOLING_MEM.
    */
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    if (queryOptions.isSpool_query_results()) {
-      // Check if we need to disable result spooling because we can not spill.
-      long scratchLimit = queryOptions.getScratch_limit();
-      String scratchDirs = BackendConfig.INSTANCE.getScratchDirs();
-      if (scratchLimit == 0 || scratchDirs.isEmpty()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Result spooling is disabled due to unavailability of 
scratch "
-              + "space.");
-        }
-        queryOptions.setSpool_query_results(false);
-        resourceProfile_ = ResourceProfile.noReservation(0);
-        return;
-      }
+    // TODO: this sanitization should have already happened in 
PlanRootSink.create().
+    // This can be removed once PlanRootSink constructor is made private.
+    sanitizeSpoolingOptions(queryOptions, true);
+    if (!queryOptions.isSpool_query_results()) {
+      resourceProfile_ = ResourceProfile.noReservation(0);
+    } else {
+      SpoolingMemoryBound memBound = new SpoolingMemoryBound(queryOptions);
+      long memEstimateBytes = 
getMemEstimateBytes(memBound.maxMemReservationBytes_);
 
-      long maxSpoolingMem = queryOptions.getMax_result_spooling_mem();
-      if (maxSpoolingMem <= 0) {
-        // max_result_spooling_mem = 0 means unbounded. But instead of setting 
unlimited
-        // memory reservation, we fallback to the default 
max_result_spooling_mem.
-        TQueryOptions defaults = new TQueryOptions();
-        maxSpoolingMem = defaults.getMax_result_spooling_mem();
-        queryOptions.setMax_result_spooling_mem(maxSpoolingMem);
-      }
+      resourceProfile_ = new ResourceProfileBuilder()
+                             .setMemEstimateBytes(memEstimateBytes)
+                             
.setMinMemReservationBytes(memBound.minMemReservationBytes_)
+                             
.setMaxMemReservationBytes(memBound.maxMemReservationBytes_)
+                             .setMaxRowBufferBytes(memBound.maxRowBufferSize_)
+                             .setSpillableBufferBytes(memBound.bufferSize_)
+                             .build();
+    }
+  }
 
-      long bufferSize = queryOptions.getDefault_spillable_buffer_size();
-      long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
-          bufferSize, queryOptions.getMax_row_size());
-      long minMemReservationBytes = 2 * maxRowBufferSize;
-      long maxMemReservationBytes = Math.max(maxSpoolingMem, 
minMemReservationBytes);
-
-      // User might set query option scratch_limit that is lower than either of
-      // minMemReservationBytes, maxMemReservationBytes, or
-      // max_spilled_result_spooling_mem. We define:
-      //
-      //   maxAllowedScratchLimit = scratchLimit - maxRowBufferSize
-      //
-      // If maxAllowedScratchLimit < minMemReservationBytes, we fall back to 
use
-      // BlockingPlanRootSink in the backend by silently disabling result 
spooling.
-      // If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower
-      // maxMemReservationBytes, max_result_spooling_mem, and
-      // max_spilled_result_spooling_mem accordingly to fit 
maxAllowedScratchLimit.
-      // Otherwise, do nothing.
-      //
-      // BufferedPlanRootSink may slightly exceed its maxMemReservationBytes 
when it
-      // decides to spill. maxRowBufferSize bytes is subtracted in 
maxAllowedScratchLimit
-      // to give extra space, ensuring spill does not exceed scratch_limit.
-      if (scratchLimit > -1) {
-        long maxAllowedScratchLimit = scratchLimit - maxRowBufferSize;
-        if (maxAllowedScratchLimit < minMemReservationBytes) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Result spooling is disabled due to low scratch_limit ("
-                + scratchLimit + "). Try increasing scratch_limit to >= "
-                + (minMemReservationBytes + maxRowBufferSize)
-                + " to enable result spooling.");
-          }
-          queryOptions.setSpool_query_results(false);
-          resourceProfile_ = ResourceProfile.noReservation(0);
-          return;
-        } else if (maxAllowedScratchLimit < maxMemReservationBytes) {
-          maxMemReservationBytes = maxAllowedScratchLimit;
-          queryOptions.setMax_result_spooling_mem(maxAllowedScratchLimit);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("max_result_spooling_mem is lowered to " + 
maxMemReservationBytes
-                + " to fit scratch_limit (" + scratchLimit + ").");
-          }
-        }
+  private long getMemEstimateBytes(long maxMemReservationBytes) {
+    PlanNode inputNode = fragment_.getPlanRoot();
 
-        // If we got here, it means we can use BufferedPlanRootSink with at 
least
-        // minMemReservationBytes in memory. But the amount of memory we can 
spill to disk
-        // may still be limited by scratch_limit. Thus, we need to lower
-        // max_spilled_result_spooling_mem as necessary.
-        if (maxAllowedScratchLimit < 
queryOptions.getMax_spilled_result_spooling_mem()) {
-          
queryOptions.setMax_spilled_result_spooling_mem(maxAllowedScratchLimit);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("max_spilled_result_spooling_mem is lowered to "
-                + maxAllowedScratchLimit + " to fit scratch_limit ("
-                + scratchLimit + ").");
-          }
-        }
-      }
+    long memEstimateBytes;
+    if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
+      memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY;
+    } else {
+      long inputCardinality = Math.max(1L, inputNode.getCardinality());
+      memEstimateBytes = (long) Math.ceil(inputCardinality * 
inputNode.getAvgRowSize());
+    }
+    memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes);
+    return memEstimateBytes;
+  }
 
-      PlanNode inputNode = fragment_.getPlanRoot();
+  /**
+   * Helper method to disable SPOOL_QUERY_RESULTS options if it is not 
possible to do
+   * so. This method will check various limits around scratch space and 
disable spooling
+   * option if such limit will be exceeded. Once disabled, SPOOL_QUERY_RESULTS 
option
+   * should not be enabled anywhere else.
+   *
+   * If SPOOL_QUERY_RESULTS is true but spill is disabled either due to 
SCRATCH_LIMIT = 0
+   * or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false.
+   *
+   * If this method does not disable spooling option, sanitize 
MAX_RESULT_SPOOLING_MEM
+   * and MAX_SPILLED_RESULT_SPOOLING_MEM option into a valid value.
+   *
+   * @param queryOptions query options to potentially modify.
+   * @param requireSpooling hard override to disable result spooling that can 
be supplied
+   *                        by caller.
+   */
+  private static void sanitizeSpoolingOptions(
+      TQueryOptions queryOptions, boolean requireSpooling) {
+    if (!requireSpooling || !queryOptions.isSpool_query_results()) {
+      queryOptions.setSpool_query_results(false);
+      return;
+    }
 
-      long memEstimateBytes;
-      if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) 
{
-        memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY;
-      } else {
-        long inputCardinality = Math.max(1L, inputNode.getCardinality());
-        memEstimateBytes = (long) Math.ceil(inputCardinality * 
inputNode.getAvgRowSize());
+    // Check if we need to disable result spooling because we can not spill.
+    long scratchLimit = queryOptions.getScratch_limit();
+    String scratchDirs = BackendConfig.INSTANCE.getScratchDirs();
+    if (scratchLimit == 0 || scratchDirs.isEmpty()) {
+      LOG.info("Result spooling is disabled due to unavailability of scratch 
space.");
+      queryOptions.setSpool_query_results(false);
+      return;
+    }
+
+    long maxSpoolingMem = queryOptions.getMax_result_spooling_mem();
+    if (maxSpoolingMem <= 0) {
+      // max_result_spooling_mem = 0 means unbounded. But instead of setting 
unlimited
+      // memory reservation, we fallback to the default 
max_result_spooling_mem.
+      TQueryOptions defaults = new TQueryOptions();
+      maxSpoolingMem = defaults.getMax_result_spooling_mem();
+      queryOptions.setMax_result_spooling_mem(maxSpoolingMem);
+    }
+
+    SpoolingMemoryBound memBound = new SpoolingMemoryBound(queryOptions);
+    if (!memBound.hasSufficientScratchLimit()) {
+      LOG.info("Result spooling is disabled due to low scratch_limit ("
+          + memBound.scratchLimit_ + "). Try increasing scratch_limit to >= "
+          + (memBound.minRequiredScratchLimit()) + " to enable result 
spooling.");
+      queryOptions.setSpool_query_results(false);
+      return;
+    }
+
+    // User might set query option scratch_limit that is lower than either of
+    // minMemReservationBytes, maxMemReservationBytes, or
+    // max_spilled_result_spooling_mem. We define:
+    //
+    //   maxAllowedScratchLimit = scratchLimit - maxRowBufferSize
+    //
+    // If maxAllowedScratchLimit < minMemReservationBytes, we fall back to use
+    // BlockingPlanRootSink in the backend by silently disabling result 
spooling.
+    // If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower
+    // max_result_spooling_mem and max_spilled_result_spooling_mem accordingly 
to fit
+    // maxAllowedScratchLimit. Otherwise, do nothing.
+    //
+    // BufferedPlanRootSink may slightly exceed its maxMemReservationBytes 
when it
+    // decides to spill. maxRowBufferSize bytes is subtracted in 
maxAllowedScratchLimit
+    // to give extra space, ensuring spill does not exceed scratch_limit.
+    if (memBound.hasBoundedScratchLimit()) {
+      Preconditions.checkState(memBound.scratchLimit_ > 0);
+      if (memBound.maxAllowedScratchLimit_ < memBound.minMemReservationBytes_) 
{
+        LOG.info("Result spooling is disabled due to low scratch_limit (" + 
scratchLimit
+            + "). Try increasing scratch_limit to >= "
+            + memBound.minRequiredScratchLimit() + " to enable result 
spooling.");
+        queryOptions.setSpool_query_results(false);
+        return;
       }
-      memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes);
 
-      resourceProfile_ = new ResourceProfileBuilder()
-                             .setMemEstimateBytes(memEstimateBytes)
-                             .setMinMemReservationBytes(minMemReservationBytes)
-                             .setMaxMemReservationBytes(maxMemReservationBytes)
-                             .setMaxRowBufferBytes(maxRowBufferSize)
-                             .setSpillableBufferBytes(bufferSize)
-                             .build();
-    } else {
-      resourceProfile_ = ResourceProfile.noReservation(0);
+      if (memBound.maxMemReservationBytes_ < 
queryOptions.getMax_result_spooling_mem()) {
+        
queryOptions.setMax_result_spooling_mem(memBound.maxMemReservationBytes_);
+        LOG.info("max_result_spooling_mem is lowered to "
+            + memBound.maxMemReservationBytes_ + " to fit scratch_limit ("
+            + memBound.scratchLimit_ + ").");
+      }
+
+      // If we got here, it means we can use BufferedPlanRootSink with at least
+      // minMemReservationBytes in memory. But the amount of memory we can 
spill to disk
+      // may still be limited by scratch_limit. Thus, we need to lower
+      // max_spilled_result_spooling_mem as necessary.
+      if (memBound.maxAllowedScratchLimit_
+          < queryOptions.getMax_spilled_result_spooling_mem()) {
+        
queryOptions.setMax_spilled_result_spooling_mem(memBound.maxAllowedScratchLimit_);
+        LOG.info("max_spilled_result_spooling_mem is lowered to "
+            + memBound.maxAllowedScratchLimit_ + " to fit scratch_limit ("
+            + memBound.scratchLimit_ + ").");
+      }
     }
   }
 
@@ -232,4 +278,70 @@ public class PlanRootSink extends DataSink {
     super.computeRowConsumptionAndProductionToCost();
     fragment_.setFixedInstanceCount(fragment_.getNumInstances());
   }
+
+  private static class SpoolingMemoryBound {
+    // Values taken as is from query options.
+    public final long scratchLimit_;
+    public final long bufferSize_;
+    public final long maxRowSize_;
+    public final long maxSpoolingMem_;
+
+    // Derived values from query options.
+    public final long maxRowBufferSize_;
+    public final long minMemReservationBytes_;
+    public final long maxMemReservationBytes_;
+
+    // Only valid if scratch limit is bounded.
+    public final long maxAllowedScratchLimit_;
+
+    public SpoolingMemoryBound(TQueryOptions queryOptions) {
+      Preconditions.checkArgument(queryOptions.isSpool_query_results());
+      Preconditions.checkArgument(queryOptions.getMax_result_spooling_mem() > 
0);
+
+      scratchLimit_ = queryOptions.getScratch_limit();
+      bufferSize_ = queryOptions.getDefault_spillable_buffer_size();
+      maxRowSize_ = queryOptions.getMax_row_size();
+      maxSpoolingMem_ = queryOptions.getMax_result_spooling_mem();
+
+      maxRowBufferSize_ =
+          PlanNode.computeMaxSpillableBufferSize(bufferSize_, maxRowSize_);
+      minMemReservationBytes_ = 2 * maxRowBufferSize_;
+
+      long maxAllowedScratchLimit = 
queryOptions.getMax_spilled_result_spooling_mem();
+      long maxMemReservationBytes = Math.max(maxSpoolingMem_, 
minMemReservationBytes_);
+      if (hasBoundedScratchLimit()) {
+        maxAllowedScratchLimit = scratchLimit_ - maxRowBufferSize_;
+        if (maxAllowedScratchLimit < maxMemReservationBytes) {
+          maxMemReservationBytes = maxAllowedScratchLimit;
+        }
+      }
+      maxAllowedScratchLimit_ = maxAllowedScratchLimit;
+      maxMemReservationBytes_ = maxMemReservationBytes;
+    }
+
+    public boolean hasBoundedScratchLimit() { return scratchLimit_ > -1; }
+
+    public long minRequiredScratchLimit() {
+      return minMemReservationBytes_ + maxRowBufferSize_;
+    }
+
+    public boolean hasSufficientScratchLimit() {
+      return !hasBoundedScratchLimit()
+          || maxAllowedScratchLimit_ >= minMemReservationBytes_;
+    }
+
+    @Override
+    public String toString() {
+      MoreObjects.ToStringHelper toStrHelper = 
MoreObjects.toStringHelper(this);
+      toStrHelper.add("scratchLimit", scratchLimit_);
+      toStrHelper.add("bufferSize", bufferSize_);
+      toStrHelper.add("maxRowSize", maxRowSize_);
+      toStrHelper.add("maxSpoolingMem", maxSpoolingMem_);
+      toStrHelper.add("maxRowBufferSize", maxRowBufferSize_);
+      toStrHelper.add("minMemReservationBytes", minMemReservationBytes_);
+      toStrHelper.add("maxMemReservationBytes", maxMemReservationBytes_);
+      toStrHelper.add("maxAllowedScratchLimit", maxAllowedScratchLimit_);
+      return toStrHelper.toString();
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 2f8eff98d..a6e83b62c 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -2380,7 +2380,7 @@ public class SingleNodePlanner implements 
SingleNodePlannerIntf {
     QueryStmt queryStmt = ctx_.getQueryStmt();
     queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
     List<Expr> resultExprs = queryStmt.getResultExprs();
-    return ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs);
+    return PlanRootSink.create(ctx_, resultExprs, queryStmt.canSpoolResult());
   }
 
   @Override
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java
index cb198582f..dddd62486 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java
@@ -84,6 +84,7 @@ public class ImpalaOperatorTable extends 
ReflectiveSqlOperatorTable {
       .add("regr_count")
       .add("localtime")
       .add("translate")
+      .add("sleep")
       .build();
 
   private static ImpalaOperatorTable INSTANCE;
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
index 3ba9e8087..0d49af089 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.calcite.rel.node;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
@@ -31,8 +33,7 @@ import org.apache.impala.common.ImpalaException;
 /**
  * ImpalaPlanRel. Interface used for all Impala intermediary RelNodes
  */
-public interface ImpalaPlanRel {
-
+public interface ImpalaPlanRel extends RelNode {
   /**
    * Enum representing the type of class used in the RelNode
    * Using an enum here so that Impala Plan RelNodes can be used in
@@ -99,8 +100,8 @@ public interface ImpalaPlanRel {
    * mentioned below can be in between the Aggregate RelNode and the Table Scan
    * RelNode.
    */
-  public static boolean canPassThroughParentAggregate(ImpalaPlanRel planRel) {
-    switch (getRelNodeType((RelNode) planRel)) {
+  public static boolean canPassThroughParentAggregate(RelNode relNode) {
+    switch (getRelNodeType(relNode)) {
       case FILTER:
       case PROJECT:
         return true;
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java
index 6dbc6aa7d..de74ffd54 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java
@@ -17,28 +17,26 @@
 
 package org.apache.impala.calcite.service;
 
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.calcite.rel.RelNode;
-import org.apache.impala.analysis.AnalysisDriver;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Values;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.ParsedStatement;
 import org.apache.impala.calcite.rel.node.ImpalaPlanRel;
 import org.apache.impala.calcite.rel.node.NodeWithExprs;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.PlannerContext;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.PlanRootSink;
+import org.apache.impala.planner.PlannerContext;
 import org.apache.impala.planner.SingleNodePlannerIntf;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TResultSetMetadata;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 
 /**
  * Implementation of the SingleNodePlannerIntf which returns a PlanNode
@@ -53,6 +51,7 @@ public class CalciteSingleNodePlanner implements 
SingleNodePlannerIntf {
   private final CalciteAnalysisResult analysisResult_;
   private NodeWithExprs rootNode_;
   private List<String> fieldNames_;
+  private boolean returnsMoreThanOneRow_;
 
   public CalciteSingleNodePlanner(PlannerContext ctx) {
     ctx_ = ctx;
@@ -71,6 +70,8 @@ public class CalciteSingleNodePlanner implements 
SingleNodePlannerIntf {
         new CalciteOptimizer(analysisResult_, ctx_.getTimeline());
     ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan);
 
+    returnsMoreThanOneRow_ = returnsMoreThanOneRow(optimizedPlan);
+
     // Create Physical Impala PlanNodes
     CalcitePhysPlanCreator physPlanCreator =
         new CalcitePhysPlanCreator(analysisResult_.getAnalyzer(), ctx_);
@@ -86,7 +87,47 @@ public class CalciteSingleNodePlanner implements 
SingleNodePlannerIntf {
    */
   @Override
   public DataSink createDataSink(ExprSubstitutionMap rootNodeSmap) {
-    return new PlanRootSink(rootNode_.outputExprs_);
+    return PlanRootSink.create(ctx_, rootNode_.outputExprs_, 
returnsMoreThanOneRow_);
+  }
+
+  private boolean returnsMoreThanOneRow(RelNode logicalPlan) {
+    return !isSingleRowValues(logicalPlan) && !hasOneRowAgg(logicalPlan);
+  }
+
+  private boolean isSingleRowValues(RelNode relNode) {
+    // A project will keep the row count the same. Theoretically, a filter
+    // can reduce the row count, but there should not be any filter
+    // over a values clause because the optimization rules should take
+    // care of this situation.
+    while (relNode instanceof Project) {
+      relNode = relNode.getInput(0);
+    }
+
+    if (!(relNode instanceof Values)) {
+      return false;
+    }
+
+    Values values = (Values) relNode;
+    return Values.isEmpty(values) || Values.isSingleValue(values);
+  }
+
+  /**
+   * Checks if there is an aggregation at the root that guarantees there
+   * will be at most one row. Avoid aggs that have groups via a group keyword
+   * or a distinct keyword.
+   */
+  private boolean hasOneRowAgg(RelNode relNode) {
+    while (ImpalaPlanRel.canPassThroughParentAggregate(relNode)) {
+      relNode = relNode.getInput(0);
+    }
+    if (!(relNode instanceof Aggregate)) {
+      return false;
+    }
+    Aggregate agg = (Aggregate) relNode;
+    if (agg.getGroupCount() > 0 || agg.containsDistinctCall()) {
+      return false;
+    }
+    return true;
   }
 
   @Override
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
index 3e5b90913..a60d0eb7a 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
@@ -250,7 +250,7 @@ public class ExecRequestCreator implements CompilerStep {
     rootFragment.verifyTree();
 
     List<Expr> resultExprs = outputExprs;
-    rootFragment.setSink(new PlanRootSink(resultExprs));
+    rootFragment.setSink(PlanRootSink.create(ctx, resultExprs, true));
 
     Planner.checkForDisableCodegen(rootFragment.getPlanRoot(), ctx);
     // finalize exchanges: this ensures that for hash partitioned joins, the 
partitioning
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test
index 0b912d56c..e5c36984e 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test
@@ -55,10 +55,10 @@ Max Per-Host Resource Reservation: Memory=72.52MB Threads=1
 Per-Host Resource Estimates: Memory=269MB
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=268.97MB mem-reservation=72.52MB 
thread-reservation=1 runtime-filters-memory=6.00MB
-|  max-parallelism=1 segment-costs=[1440342244, 4]
+|  max-parallelism=1 segment-costs=[1440342244, 0]
 PLAN-ROOT SINK
 |  output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), 
sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=4
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 11:AGGREGATE [FINALIZE]
 |  output: avg(CAST(tpcds_partitioned_parquet_snap.store_sales.ss_quantity AS 
BIGINT)), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), 
sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
@@ -184,14 +184,14 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=40B cardinality=176.68M(filtered from 863.99M) 
cost=1216284982
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=404.45MB Threads=24
-Per-Host Resource Estimates: Memory=677MB
+Max Per-Host Resource Reservation: Memory=400.45MB Threads=24
+Per-Host Resource Estimates: Memory=673MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
-|  max-parallelism=1 segment-costs=[68, 4] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
+|  Per-Instance Resources: mem-estimate=68.03KB mem-reservation=0B 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[68, 0] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
 PLAN-ROOT SINK
 |  output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), 
sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=4
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 19:AGGREGATE [FINALIZE]
 |  output: avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), 
avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), 
avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), 
sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
@@ -425,14 +425,14 @@ max-parallelism=150 segment-costs=[1456086510]
    tuple-ids=0 row-size=40B cardinality=176.68M(filtered from 863.99M) 
cost=1216284982
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=404.45MB Threads=24
-Per-Host Resource Estimates: Memory=677MB
+Max Per-Host Resource Reservation: Memory=400.45MB Threads=24
+Per-Host Resource Estimates: Memory=673MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
-|  max-parallelism=1 segment-costs=[68, 4] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
+|  Per-Instance Resources: mem-estimate=68.03KB mem-reservation=0B 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[68, 0] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
 PLAN-ROOT SINK
 |  output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), 
avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), 
sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=4
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 19:AGGREGATE [FINALIZE]
 |  output: avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), 
avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), 
avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), 
sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test
index b4a998a19..0a9dfbaaa 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test
@@ -71,10 +71,10 @@ Max Per-Host Resource Reservation: Memory=69.52MB Threads=1
 Per-Host Resource Estimates: Memory=250MB
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=250.04MB mem-reservation=69.52MB 
thread-reservation=1 runtime-filters-memory=5.00MB
-|  max-parallelism=1 segment-costs=[1126444413, 1]
+|  max-parallelism=1 segment-costs=[1126444413, 0]
 PLAN-ROOT SINK
 |  output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=1
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 09:AGGREGATE [FINALIZE]
 |  output: sum(CAST(tpcds_partitioned_parquet_snap.store_sales.ss_quantity AS 
BIGINT))
@@ -179,14 +179,14 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=28B cardinality=176.68M(filtered from 863.99M) 
cost=910976956
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=379.14MB Threads=22
-Per-Host Resource Estimates: Memory=622MB
+Max Per-Host Resource Reservation: Memory=375.14MB Threads=22
+Per-Host Resource Estimates: Memory=618MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
-|  max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
 PLAN-ROOT SINK
 |  output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=1
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 16:AGGREGATE [FINALIZE]
 |  output: sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
@@ -382,14 +382,14 @@ max-parallelism=130 segment-costs=[1221102531]
    tuple-ids=0 row-size=28B cardinality=176.68M(filtered from 863.99M) 
cost=910976956
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=379.14MB Threads=22
-Per-Host Resource Estimates: Memory=622MB
+Max Per-Host Resource Reservation: Memory=375.14MB Threads=22
+Per-Host Resource Estimates: Memory=618MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
-|  max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=130 [max(1 
(self) vs 130 (sum children))]
 PLAN-ROOT SINK
 |  output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=1
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 16:AGGREGATE [FINALIZE]
 |  output: sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test
index 997ac1342..b0286a601 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test
@@ -27,10 +27,10 @@ Max Per-Host Resource Reservation: Memory=192.94MB Threads=1
 Per-Host Resource Estimates: Memory=52.18GB
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=52.18GB mem-reservation=192.94MB 
thread-reservation=1 runtime-filters-memory=51.00MB
-|  max-parallelism=1 segment-costs=[54918503750, 27478414018, 72455093642, 
13789590829, 19351054919, 22241751, 1]
+|  max-parallelism=1 segment-costs=[54918503750, 27478414018, 72455093642, 
13789590829, 19351054919, 22241751, 0]
 PLAN-ROOT SINK
 |  output exprs: count()
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=1
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 22:AGGREGATE [FINALIZE]
 |  output: count()
@@ -238,11 +238,11 @@ PLAN-ROOT SINK
 Max Per-Host Resource Reservation: Memory=4.62GB Threads=85
 Per-Host Resource Estimates: Memory=59.85GB
 F16:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
-|  max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=360 [max(1 
(self) vs 360 (sum children))]
+|  Per-Instance Resources: mem-estimate=184.84KB mem-reservation=0B 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=360 [max(1 
(self) vs 360 (sum children))]
 PLAN-ROOT SINK
 |  output exprs: count()
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=1
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 40:AGGREGATE [FINALIZE]
 |  output: count:merge()
@@ -641,11 +641,11 @@ max-parallelism=1824 segment-costs=[54881581784, 
24500370827] cpu-comparison-res
 Max Per-Host Resource Reservation: Memory=4.62GB Threads=85
 Per-Host Resource Estimates: Memory=59.85GB
 F16:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
-|  max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=360 [max(1 
(self) vs 360 (sum children))]
+|  Per-Instance Resources: mem-estimate=184.84KB mem-reservation=0B 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=360 [max(1 
(self) vs 360 (sum children))]
 PLAN-ROOT SINK
 |  output exprs: count()
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=1
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
 |
 40:AGGREGATE [FINALIZE]
 |  output: count:merge()
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
index df28b3fc0..9fb016d01 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
@@ -71,7 +71,7 @@ Per-Instance Resources: mem-estimate=16.00MB 
mem-reservation=32.00KB thread-rese
    in pipelines: 00(GETNEXT)
 ====
 # Validate that the maximum memory reservation for PLAN-ROOT SINK is bounded by
-# MAX_PINNED_RESULT_SPOOLING_MEMORY.
+# MAX_SPILLED_RESULT_SPOOLING_MEM.
 select * from tpch.lineitem order by l_orderkey
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=24.00MB Threads=3
diff --git a/testdata/workloads/functional-query/queries/QueryTest/calcite.test 
b/testdata/workloads/functional-query/queries/QueryTest/calcite.test
index db57ca70c..5cccf98b8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/calcite.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/calcite.test
@@ -1128,3 +1128,13 @@ NaN,Inf
 ---- TYPES
 DOUBLE, FLOAT
 ====
+---- QUERY
+select count(*) from functional.alltypestiny;
+---- RUNTIME_PROFILE
+row_regex: .*SPOOL_QUERY_RESULTS=0.*
+====
+---- QUERY
+select * from (values(0));
+---- RUNTIME_PROFILE
+row_regex: .*SPOOL_QUERY_RESULTS=0.*
+====


Reply via email to