This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.4.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5bf979d03bf131de86cca5f979e9d88d3240305f
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Jun 11 09:50:15 2024 -0700

    IMPALA-13152: Avoid NaN, infinite, and negative ProcessingCost
    
    TOP-N cost will turn into NaN if inputCardinality is equal to 0 due to
    Math.log(inputCardinality). This patch fix the issue by avoiding
    Math.log(0) and replace it with 0 instead.
    
    After this patch, Instantiating BaseProcessingCost with NaN, infinite,
    or negative totalCost will throw IllegalArgumentException. In
    BaseProcessingCost.getDetails(), "total-cost" is renamed to "raw-cost"
    to avoid confusion with "cost-total" in ProcessingCost.getDetails().
    
    Testing:
    - Add testcase that run TOP-N query over empty table.
    - Compute ProcessingCost in most FE and EE test even when
      COMPUTE_PROCESSING_COST option is not enabled by checking if
      RuntimeEnv.INSTANCE.isTestEnv() is True or TEST_REPLAN option is
      enabled.
    - Pass core test.
    
    Change-Id: Ib49c7ae397dadcb2cb69fde1850d442d33cdf177
    Reviewed-on: http://gerrit.cloudera.org:8080/21504
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 common/thrift/ImpalaService.thrift                 |  4 +-
 .../org/apache/impala/analysis/AggregateInfo.java  |  4 ++
 .../org/apache/impala/planner/AggregationNode.java | 10 +--
 .../apache/impala/planner/BaseProcessingCost.java  |  8 ++-
 .../impala/planner/BroadcastProcessingCost.java    |  2 +-
 .../org/apache/impala/planner/CostingSegment.java  |  3 +-
 .../java/org/apache/impala/planner/DataSink.java   |  6 +-
 .../org/apache/impala/planner/DataStreamSink.java  |  2 +-
 .../apache/impala/planner/DistributedPlanner.java  |  1 -
 .../org/apache/impala/planner/ExchangeNode.java    |  2 +-
 .../org/apache/impala/planner/HashJoinNode.java    |  9 +--
 .../org/apache/impala/planner/HdfsScanNode.java    |  3 +-
 .../org/apache/impala/planner/HdfsTableSink.java   |  2 +-
 .../apache/impala/planner/IcebergDeleteNode.java   |  6 +-
 .../java/org/apache/impala/planner/JoinNode.java   |  6 +-
 .../apache/impala/planner/NestedLoopJoinNode.java  |  2 +-
 .../org/apache/impala/planner/PlanFragment.java    |  2 +-
 .../java/org/apache/impala/planner/PlanNode.java   |  4 +-
 .../org/apache/impala/planner/PlanRootSink.java    |  6 +-
 .../java/org/apache/impala/planner/Planner.java    | 24 ++++---
 .../org/apache/impala/planner/ProcessingCost.java  | 20 ++++--
 .../impala/planner/ScaledProcessingCost.java       |  3 +-
 .../java/org/apache/impala/planner/ScanNode.java   |  5 +-
 .../java/org/apache/impala/planner/SortNode.java   |  9 +--
 .../apache/impala/planner/SumProcessingCost.java   |  6 +-
 .../java/org/apache/impala/planner/UnionNode.java  |  2 +-
 .../org/apache/impala/analysis/ExprNdvTest.java    | 12 ++++
 .../org/apache/impala/planner/CardinalityTest.java | 17 +++++
 .../queries/PlannerTest/tpcds-processing-cost.test | 78 ++++++++++++++++++++++
 .../tpcds_cpu_cost/tpcds-q43-verbose.test          | 36 +++++-----
 30 files changed, 218 insertions(+), 76 deletions(-)

diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index fdf7710dc..3884d2fd1 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -736,7 +736,9 @@ enum TImpalaQueryOptions {
   // If true, replanning is enabled.
   ENABLE_REPLAN = 143;
 
-  // If true, test replan by imposing artificial two executor groups in FE.
+  // If true, test replan by imposing artificial two executor groups in FE and 
always
+  // compute ProcessingCost. The degree of parallelism adjustment, however, 
still require
+  // COMPUTE_PROCESSING_COST option set to true.
   TEST_REPLAN = 144;
 
   // Maximum wait time on HMS ACID lock in seconds.
diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java 
b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
index 1eb5cc161..25cd8ebb8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
@@ -734,6 +734,10 @@ public class AggregateInfo extends AggregateInfoBase {
 
   public ProcessingCost computeProcessingCost(
       String label, long inputCardinality, long intermediateOutputCardinality) 
{
+    Preconditions.checkArgument(
+        inputCardinality >= 0, "inputCardinality should not be negative!");
+    Preconditions.checkArgument(intermediateOutputCardinality >= 0,
+        "intermediateOutputCardinality should not be negative!");
     // Benchmarking suggests we can estimate the processing cost as a linear 
function
     // based the probe input cardinality, the "intermediate" output 
cardinality, and
     // an incremental cost per input row for each additional aggregate 
function.
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index c48e8fbba..aa0a5ce5c 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -575,13 +575,15 @@ public class AggregationNode extends PlanNode {
       // duplicate keys across fragments.  Calculate an overall "intermediate" 
output
       // cardinality that attempts to account for the dups. Cap it at the input
       // cardinality because an aggregation cannot increase the cardinality.
-      long inputCardinality = getAggClassNumGroup(prevAgg, aggInfo);
+      long inputCardinality = Math.max(0, getAggClassNumGroup(prevAgg, 
aggInfo));
       long perInstanceNdv = fragment_.getPerInstanceNdvForCpuCosting(
           inputCardinality, aggInfo.getGroupingExprs());
-      long intermediateOutputCardinality = Math.min(
-          inputCardinality, perInstanceNdv * 
fragment_.getNumInstancesForCosting());
+      long intermediateOutputCardinality = Math.max(0,
+          Math.min(
+              inputCardinality, perInstanceNdv * 
fragment_.getNumInstancesForCosting()));
+      long aggClassNumGroup = Math.max(0, getAggClassNumGroup(prevAgg, 
aggInfo));
       ProcessingCost aggCost = aggInfo.computeProcessingCost(getDisplayLabel(),
-          getAggClassNumGroup(prevAgg, aggInfo), 
intermediateOutputCardinality);
+          aggClassNumGroup, intermediateOutputCardinality);
       processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost);
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java 
b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
index 104205986..67d189da9 100644
--- a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
+++ b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.planner;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A basic implementation of {@link ProcessingCost} that takes account 
expression cost
  * and average row size as per-row costing weight.
@@ -42,6 +44,10 @@ public class BaseProcessingCost extends ProcessingCost {
   }
 
   public BaseProcessingCost(double totalCost) {
+    Preconditions.checkArgument(!Double.isNaN(totalCost), "totalCost must not 
be a NaN!");
+    Preconditions.checkArgument(
+        Double.isFinite(totalCost), "totalCost must be a finite double!");
+    Preconditions.checkArgument(totalCost >= 0, "totalCost must not be a 
negative!");
     cardinality_ = 0L;
     exprsCost_ = 0.0F;
     materializationCost_ = 0.0F;
@@ -74,7 +80,7 @@ public class BaseProcessingCost extends ProcessingCost {
   public String getDetails() {
     StringBuilder output = new StringBuilder();
     output.append(super.getDetails());
-    output.append(" total-cost=").append(totalCost_);
+    output.append(" raw-cost=").append(totalCost_);
     if (cardinality_ != 0L) { output.append(" 
cardinality=").append(cardinality_); }
     return output.toString();
   }
diff --git 
a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java 
b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
index d031ecea6..6f214c1ef 100644
--- a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
+++ b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
@@ -36,7 +36,7 @@ public class BroadcastProcessingCost extends ProcessingCost {
   protected BroadcastProcessingCost(
       ProcessingCost cost, Supplier<Integer> countSupplier) {
     Preconditions.checkArgument(
-        cost.isValid(), "BroadcastProcessingCost: cost is invalid!");
+        cost.isValid(), "BroadcastProcessingCost: cost is invalid! %s", cost);
     childProcessingCost_ = cost;
     setNumInstanceExpected(countSupplier);
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java 
b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
index 634eea302..e7c884be2 100644
--- a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
+++ b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
@@ -120,7 +120,8 @@ public class CostingSegment extends 
TreeNode<CostingSegment> {
   }
 
   private void appendCost(ProcessingCost additionalCost) {
-    Preconditions.checkArgument(additionalCost.isValid());
+    Preconditions.checkArgument(
+        additionalCost.isValid(), "additionalCost is invalid! %s", 
additionalCost);
     ProcessingCost newTotalCost = ProcessingCost.sumCost(additionalCost, 
cost_);
     newTotalCost.setNumRowToConsume(cost_.getNumRowToConsume());
     newTotalCost.setNumRowToProduce(additionalCost.getNumRowToConsume());
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataSink.java
index 2cda96a02..97571207d 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java
@@ -138,9 +138,9 @@ public abstract class DataSink {
    */
   public void computeRowConsumptionAndProductionToCost() {
     Preconditions.checkState(processingCost_.isValid(),
-        "Processing cost of DataSink " + fragment_.getId() + ":" + getLabel()
-            + " is invalid!");
-    long inputOutputCardinality = fragment_.getPlanRoot().getCardinality();
+        "Processing cost of DataSink %s:%s is invalid! %s", fragment_.getId(), 
getLabel(),
+        processingCost_);
+    long inputOutputCardinality = Math.max(0, 
fragment_.getPlanRoot().getCardinality());
     processingCost_.setNumRowToConsume(inputOutputCardinality);
     processingCost_.setNumRowToProduce(inputOutputCardinality);
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index 29edd9e7a..5ea939a7c 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -100,7 +100,7 @@ public class DataStreamSink extends DataSink {
   public void computeProcessingCost(TQueryOptions queryOptions) {
     // The sending part of the processing cost for the exchange node.
 
-    long outputCardinality = exchNode_.getFilteredCardinality();
+    long outputCardinality = Math.max(0, exchNode_.getFilteredCardinality());
     long outputSize = (long) (exchNode_.getAvgDeserializedRowSize() * 
outputCardinality);
     double totalCost = 0.0;
     String exchType;
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index e984b8615..42da9621e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -20,7 +20,6 @@ package org.apache.impala.planner;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DmlStatementBase;
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java 
b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index cde1d2d49..9f24c5809 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -253,7 +253,7 @@ public class ExchangeNode extends PlanNode {
     //      bottom sending fragment;
     //   2. The receiving processing cost in the top receiving fragment which 
is computed
     //      here.
-    long inputCardinality = getChild(0).getFilteredCardinality();
+    long inputCardinality = Math.max(0, getChild(0).getFilteredCardinality());
 
     // It's not obvious whether the per-byte CPU costs are more accurately 
estimated
     // using the serialized or deserialized sizes, but the coefficients were 
determined
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 766e6fac7..96cae6785 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -339,12 +339,13 @@ public class HashJoinNode extends JoinNode {
     // Compute the processing cost for lhs. Benchmarking suggests we can 
estimate the
     // probe cost as a linear function combining the probe input cardinality 
and the
     // estimated output cardinality.
+    long outputCardinality = Math.max(0, getCardinality());
     double totalProbeCost =
         (getProbeCardinalityForCosting() * COST_COEFFICIENT_PROBE_INPUT)
-        + (getCardinality() * COST_COEFFICIENT_HASH_JOIN_OUTPUT);
+        + (outputCardinality * COST_COEFFICIENT_HASH_JOIN_OUTPUT);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Probe CPU cost estimate: " + totalProbeCost + ", Input Card: "
-          + getProbeCardinalityForCosting() + ", Output Card: " + 
getCardinality());
+          + getProbeCardinalityForCosting() + ", Output Card: " + 
outputCardinality);
     }
     ProcessingCost probeProcessingCost =
         ProcessingCost.basicCost(getDisplayLabel(), totalProbeCost);
@@ -357,8 +358,8 @@ public class HashJoinNode extends JoinNode {
     // build fragment count is fixed for broadcast(at num hosts) regardless of 
the cost
     // computed here. But we should clean up the costing here to avoid any 
future
     // confusion.
-    double totalBuildCost =
-        getChild(1).getFilteredCardinality() * COST_COEFFICIENT_BUILD_INPUT;
+    long buildCardinality = Math.max(0, getChild(1).getFilteredCardinality());
+    double totalBuildCost = buildCardinality * COST_COEFFICIENT_BUILD_INPUT;
     ProcessingCost buildProcessingCost =
         ProcessingCost.basicCost(getDisplayLabel() + " Build side", 
totalBuildCost);
     return Pair.create(probeProcessingCost, buildProcessingCost);
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 23287e176..593168bc6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -2228,8 +2228,6 @@ public class HdfsScanNode extends ScanNode {
    */
   @Override
   protected ProcessingCost computeScanProcessingCost(TQueryOptions 
queryOptions) {
-    Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
-
     long inputCardinality = getFilteredInputCardinality();
     long estBytes = 0L;
     double bytesCostCoefficient = 0.0;
@@ -2255,6 +2253,7 @@ public class HdfsScanNode extends ScanNode {
       }
       return ProcessingCost.basicCost(getDisplayLabel(), totalCost);
     } else {
+      // Input cardinality is 0 or unknown. Fallback to superclass.
       return super.computeScanProcessingCost(queryOptions);
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index d6460ae88..5ba8e8f85 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -158,7 +158,7 @@ public class HdfsTableSink extends TableSink {
   @Override
   public void computeProcessingCost(TQueryOptions queryOptions) {
     PlanNode inputNode = fragment_.getPlanRoot();
-    long cardinality = inputNode.getCardinality();
+    long cardinality = Math.max(0, inputNode.getCardinality());
     float avgRowDataSize = inputNode.getAvgRowSizeWithoutPad();
     long estBytesInserted = (long) Math.ceil(avgRowDataSize * (double) 
cardinality);
     double totalCost = 0.0F;
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
index 2da88274b..5830d5927 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
@@ -228,9 +228,9 @@ public class IcebergDeleteNode extends JoinNode {
             getProbeCardinalityForCosting(), eqJoinPredicateEvalCost);
 
     // Compute the processing cost for rhs.
-    ProcessingCost buildProcessingCost =
-        ProcessingCost.basicCost(getDisplayLabel() + " Build side",
-            getChild(1).getCardinality(), eqJoinPredicateEvalCost);
+    long buildCardinality = Math.max(0, getChild(1).getCardinality());
+    ProcessingCost buildProcessingCost = ProcessingCost.basicCost(
+        getDisplayLabel() + " Build side", buildCardinality, 
eqJoinPredicateEvalCost);
     return Pair.create(probeProcessingCost, buildProcessingCost);
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 10f24a5a7..e91dee0a0 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -1000,8 +1000,12 @@ public abstract class JoinNode extends PlanNode {
    */
   public abstract Pair<ProcessingCost, ProcessingCost> 
computeJoinProcessingCost();
 
+  /**
+   * Get filtered cardinality of probe hand of join node.
+   * Sanitized unknown cardinality (-1) into 0.
+   */
   protected long getProbeCardinalityForCosting() {
-    return getChild(0).getFilteredCardinality();
+    return Math.max(0, getChild(0).getFilteredCardinality());
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index 414489e10..4bad6aee0 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -111,7 +111,7 @@ public class NestedLoopJoinNode extends JoinNode {
     // different costs here based on that RHS threshold.
     // We return the full cost in the first element of the Pair.
     long probeCardinality = getProbeCardinalityForCosting();
-    long buildCardinality = getChild(1).getCardinality();
+    long buildCardinality = Math.max(0, getChild(1).getCardinality());
     long cardProduct = checkedMultiply(probeCardinality, buildCardinality);
     long perInstanceBuildCardinality =
         (long) Math.ceil(buildCardinality / 
fragment_.getNumInstancesForCosting());
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 1809b6772..1c7f68a5f 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -1101,7 +1101,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     List<CostingSegment> costingSegments = rootSegment_.getNodesPreOrder();
     for (CostingSegment costingSegment : costingSegments) {
       ProcessingCost cost = costingSegment.getProcessingCost();
-      Preconditions.checkState(cost.isValid());
+      Preconditions.checkState(cost.isValid(), "Segment cost is invalid! %s", 
cost);
       Preconditions.checkState(
           cost.getNumInstancesExpected() == getAdjustedInstanceCount());
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 842e024d8..62c1a108d 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -1045,7 +1045,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
    */
   public void computeRowConsumptionAndProductionToCost() {
     Preconditions.checkState(processingCost_.isValid(),
-        "Processing cost of PlanNode " + getDisplayLabel() + " is invalid!");
+        "Processing cost of PlanNode %s is invalid! %s", getDisplayLabel(),
+        processingCost_);
     processingCost_.setNumRowToConsume(getInputCardinality());
     processingCost_.setNumRowToProduce(getCardinality());
   }
@@ -1279,6 +1280,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
     filteredCardinality_ = newCardinality;
   }
 
+  // May return -1.
   // TODO: merge this with getCardinality().
   protected long getFilteredCardinality() {
     return filteredCardinality_ > -1 ? filteredCardinality_ : getCardinality();
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java 
b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 8da761a32..df4784935 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -73,9 +73,9 @@ public class PlanRootSink extends DataSink {
     if (queryOptions.isSpool_query_results() && 
queryOptions.getScratch_limit() != 0
         && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) {
       // The processing cost to buffer these many rows in root.
-      processingCost_ =
-          ProcessingCost.basicCost(getLabel(), 
fragment_.getPlanRoot().getCardinality(),
-              ExprUtil.computeExprsTotalCost(outputExprs_));
+      long outputCardinality = Math.max(0, 
fragment_.getPlanRoot().getCardinality());
+      processingCost_ = ProcessingCost.basicCost(
+          getLabel(), outputCardinality, 
ExprUtil.computeExprsTotalCost(outputExprs_));
     } else {
       processingCost_ = ProcessingCost.zero();
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 29685f924..a66caf1f3 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -550,15 +550,23 @@ public class Planner {
   public static void computeProcessingCost(
       List<PlanFragment> planRoots, TQueryExecRequest request, PlannerContext 
planCtx) {
     Analyzer rootAnalyzer = planCtx.getRootAnalyzer();
-    if (!rootAnalyzer.getQueryOptions().isCompute_processing_cost()) {
-      request.setCores_required(-1);
-      return;
-    }
+    TQueryOptions queryOptions = rootAnalyzer.getQueryOptions();
 
     PlanFragment rootFragment = planRoots.get(0);
-    List<PlanFragment> postOrderFragments = rootFragment.getNodesPostOrder();
-    for (PlanFragment fragment : postOrderFragments) {
-      fragment.computeCostingSegment(rootAnalyzer.getQueryOptions());
+    List<PlanFragment> postOrderFragments = new ArrayList<>();
+    boolean testCostCalculation = queryOptions.isEnable_replan()
+        && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan());
+    if (queryOptions.isCompute_processing_cost() || testCostCalculation) {
+      postOrderFragments = rootFragment.getNodesPostOrder();
+      for (PlanFragment fragment : postOrderFragments) {
+        fragment.computeCostingSegment(queryOptions);
+      }
+    }
+
+    // Only do parallelism adjustment if COMPUTE_PROCESSING_COST is enabled.
+    if (!queryOptions.isCompute_processing_cost()) {
+      request.setCores_required(-1);
+      return;
     }
 
     if (LOG.isTraceEnabled()) {
@@ -571,7 +579,7 @@ public class Planner {
 
     computeEffectiveParallelism(postOrderFragments,
         rootAnalyzer.getMinParallelismPerNode(), 
rootAnalyzer.getMaxParallelismPerNode(),
-        rootAnalyzer.getQueryOptions());
+        queryOptions);
 
     // Count bounded core count. This is taken from final instance count from 
previous
     // step.
diff --git a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java 
b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
index c929b6166..e27a610b9 100644
--- a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
+++ b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
@@ -80,10 +80,6 @@ public abstract class ProcessingCost implements Cloneable {
         Math.max(0, cardinality), exprsCost, materializationCost);
   }
 
-  private static ProcessingCost computeValidBaseCost(double totalCost) {
-    return new BaseProcessingCost(totalCost);
-  }
-
   public static ProcessingCost basicCost(
       String label, long cardinality, float exprsCost, float 
materializationCost) {
     ProcessingCost processingCost =
@@ -99,10 +95,20 @@ public abstract class ProcessingCost implements Cloneable {
     return processingCost;
   }
 
+  /**
+   * Create new ProcessingCost.
+   * 'totalCost' must not be a negative, NaN, or infinite.
+   */
   public static ProcessingCost basicCost(String label, double totalCost) {
-    ProcessingCost processingCost = computeValidBaseCost(totalCost);
-    processingCost.setLabel(label);
-    return processingCost;
+    try {
+      ProcessingCost processingCost = new BaseProcessingCost(totalCost);
+      processingCost.setLabel(label);
+      return processingCost;
+    } catch (IllegalArgumentException ex) {
+      // Rethrow with label mentioned in the exception message.
+      throw new IllegalArgumentException(
+          String.format("Invalid totalCost supplied for %s", label), ex);
+    }
   }
 
   /**
diff --git 
a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java 
b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
index 421ee9e64..9c7e14465 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
@@ -26,7 +26,8 @@ public class ScaledProcessingCost extends ProcessingCost {
   private final long multiplier_;
 
   protected ScaledProcessingCost(ProcessingCost cost, long multiplier) {
-    Preconditions.checkArgument(cost.isValid(), "ScaledProcessingCost: cost is 
invalid!");
+    Preconditions.checkArgument(
+        cost.isValid(), "ScaledProcessingCost: cost is invalid! %s", cost);
     Preconditions.checkArgument(
         multiplier >= 0, "ScaledProcessingCost: multiplier must be 
non-negative!");
     cost_ = cost;
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 08d6c3129..9cb78b67e 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -345,6 +345,7 @@ abstract public class ScanNode extends PlanNode {
     return capInputCardinalityWithLimit(inputCardinality_);
   }
 
+  // May return -1.
   // TODO: merge this with getInputCardinality().
   public long getFilteredInputCardinality() {
     return capInputCardinalityWithLimit(
@@ -396,8 +397,6 @@ abstract public class ScanNode extends PlanNode {
    * number of scan ranges and related query options.
    */
   protected int computeMaxScannerThreadsForCPC(TQueryOptions queryOptions) {
-    Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
-
     // maxThread calculation below intentionally does not include core count 
from
     // executor group config. This is to allow scan fragment parallelism to 
scale
     // regardless of the core count limit.
@@ -417,8 +416,6 @@ abstract public class ScanNode extends PlanNode {
    * the return value of this method.
    */
   protected ProcessingCost computeScanProcessingCost(TQueryOptions 
queryOptions) {
-    Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
-
     int maxScannerThreads = computeMaxScannerThreadsForCPC(queryOptions);
     long inputCardinality = getFilteredInputCardinality();
 
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index b82c24c8e..4d120818e 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -473,10 +473,12 @@ public class SortNode extends PlanNode {
     // TODO: Benchmark partial sort cost separately.
     // TODO: Improve this for larger spilling sorts.
     double totalCost = 0.0F;
-    long inputCardinality = getChild(0).getFilteredCardinality();
+    long inputCardinality = Math.max(0, getChild(0).getFilteredCardinality());
+    double log2InputCardinality =
+        inputCardinality <= 0 ? 0.0 : (Math.log(inputCardinality) / 
Math.log(2));
     if (type_ == TSortType.TOTAL || type_ == TSortType.PARTIAL) {
       if (avgRowSize_ <= 10) {
-        totalCost = inputCardinality * (Math.log(inputCardinality) / 
Math.log(2))
+        totalCost = inputCardinality * log2InputCardinality
             * COST_COEFFICIENT_SORT_TOTAL_SMALL_ROW;
       } else {
         double fullInputSize = inputCardinality * avgRowSize_;
@@ -487,8 +489,7 @@ public class SortNode extends PlanNode {
       Preconditions.checkState(
           type_ == TSortType.TOPN || type_ == TSortType.PARTITIONED_TOPN);
       // Benchmarked TopN sort costs were ~ NlogN rows.
-      totalCost = inputCardinality * (Math.log(inputCardinality) / Math.log(2))
-          * COST_COEFFICIENT_SORT_TOPN;
+      totalCost = inputCardinality * log2InputCardinality * 
COST_COEFFICIENT_SORT_TOPN;
     }
     if (LOG.isTraceEnabled()) {
       LOG.trace("Sort CPU cost estimate: " + totalCost + ", Type: " + type_
diff --git a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java 
b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
index fcbd6dfab..f567c8696 100644
--- a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
+++ b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
@@ -26,8 +26,10 @@ public class SumProcessingCost extends ProcessingCost {
   private final ProcessingCost cost2_;
 
   protected SumProcessingCost(ProcessingCost cost1, ProcessingCost cost2) {
-    Preconditions.checkArgument(cost1.isValid(), "SumProcessingCost: cost1 is 
invalid!");
-    Preconditions.checkArgument(cost2.isValid(), "SumProcessingCost: cost2 is 
invalid!");
+    Preconditions.checkArgument(
+        cost1.isValid(), "SumProcessingCost: cost1 is invalid! %s", cost1);
+    Preconditions.checkArgument(
+        cost2.isValid(), "SumProcessingCost: cost2 is invalid! %s", cost2);
     cost1_ = cost1;
     cost2_ = cost2;
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java 
b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index 527dac825..5ff654abe 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -164,7 +164,7 @@ public class UnionNode extends PlanNode {
       PlanNode child = children_.get(i);
       if (child.cardinality_ >= 0) {
         totalMaterializedCardinality =
-            checkedAdd(totalMaterializedCardinality, child.cardinality_);
+            checkedAdd(totalMaterializedCardinality, Math.max(0, 
child.cardinality_));
       }
     }
     long estBytesMaterialized =
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java
index 7e8418c99..48eb56bfd 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java
@@ -20,13 +20,25 @@ package org.apache.impala.analysis;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.RuntimeEnv;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
  * Tests computeNumDistinctValues() estimates for Exprs
  */
 public class ExprNdvTest extends FrontendTestBase {
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(true);
+  }
+
+  @AfterClass
+  public static void cleanUpClass() {
+    RuntimeEnv.INSTANCE.reset();
+  }
 
   public void verifyNdv(String expr, long expectedNdv)
       throws ImpalaException {
diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java 
b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
index ae97fdef7..a6807c2e3 100644
--- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
@@ -18,6 +18,7 @@
 package org.apache.impala.planner;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
@@ -25,11 +26,14 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.QueryConstants;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
@@ -42,6 +46,16 @@ public class CardinalityTest extends PlannerTestBase {
 
   private static double CARDINALITY_TOLERANCE = 0.05;
 
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(true);
+  }
+
+  @AfterClass
+  public static void cleanUpClass() {
+    RuntimeEnv.INSTANCE.reset();
+  }
+
   /**
    * Test the happy path: table with stats, no all-null cols.
    */
@@ -1117,6 +1131,9 @@ public class CardinalityTest extends PlannerTestBase {
     // the distributed plan).
     PlanNode currentNode = plan.get(plan.size() - 1).getPlanRoot();
     for (Integer currentChildIndex: path) {
+      assertTrue(currentNode.getDisplayLabel() + " does not have child index "
+              + currentChildIndex,
+          currentNode.hasChild(currentChildIndex));
       currentNode = currentNode.getChild(currentChildIndex);
     }
     assertEquals("PlanNode class not matched: ", cl.getName(),
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
index 94633f8e0..ded736b47 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
@@ -200,3 +200,81 @@ max-parallelism=1 segment-costs=[28]
    tuple-ids=0 row-size=12B cardinality=20 cost=3
    in pipelines: 00(GETNEXT)
 ====
+# IMPALA-13152: Regression test for TOP-N query over empty table.
+select field, rk from (
+select
+  field, f2,
+  row_number()
+      over (partition by field order by f2) rk
+from functional.emptytable
+) b
+where rk = 1;
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=48.00MB Threads=4
+Per-Host Resource Estimates: Memory=48MB
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.06MB mem-reservation=4.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[0] cpu-comparison-result=2 [max(1 (self) 
vs 2 (sum children))]
+PLAN-ROOT SINK
+|  output exprs: field, row_number()
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=0
+|
+06:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=64.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4,3 row-size=24B cardinality=0 cost=0
+|  in pipelines: 05(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(field)] hosts=1 instances=2 (adjusted from 1)
+Per-Instance Resources: mem-estimate=16.12MB mem-reservation=16.00MB 
thread-reservation=1
+max-parallelism=1 segment-costs=[0, 0] cpu-comparison-result=2 [max(2 (self) 
vs 1 (sum children))]
+03:SELECT
+|  predicates: row_number() = CAST(1 AS BIGINT)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=4,3 row-size=24B cardinality=0 cost=0
+|  in pipelines: 05(GETNEXT)
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: field
+|  order by: f2 ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=4,3 row-size=24B cardinality=0 cost=0
+|  in pipelines: 05(GETNEXT)
+|
+05:TOP-N
+|  partition by: field
+|  order by: f2 ASC
+|  partition limit: 1
+|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=4 row-size=16B cardinality=0 cost=0
+|  in pipelines: 05(GETNEXT), 01(OPEN)
+|
+04:EXCHANGE [HASH(field)]
+|  mem-estimate=20.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4 row-size=16B cardinality=0 cost=0
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Instance Resources: mem-estimate=12.16MB mem-reservation=12.00MB 
thread-reservation=1
+max-parallelism=1 segment-costs=[0, 0]
+01:TOP-N
+|  partition by: field
+|  order by: f2 ASC
+|  partition limit: 1
+|  source expr: row_number() = CAST(1 AS BIGINT)
+|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=4 row-size=16B cardinality=0 cost=0
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional.emptytable, RANDOM]
+   partitions=0/0 files=0 size=0B
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/0 rows=0
+     columns missing stats: field
+   extrapolated-rows=disabled max-scan-range-rows=0
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=0 cost=0
+   in pipelines: 00(GETNEXT)
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test
index 1c1293a91..d2c0cbe9e 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test
@@ -30,29 +30,29 @@ SumCost: cost-total=956 max-instances=1 adj-instances=1 
cost/inst=956 #cons:#pro
   PLAN-ROOT SINK
   |  output exprs: s_store_name, s_store_id, sum(CASE WHEN (d_day_name = 
'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 
'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 
'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 
'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 
'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 
'Friday') THEN ss_sales_price ELSE NULL END [...]
   |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=900
-  |  cost-total=900 max-instances=1 adj-instances=1 cost/inst=900 
#cons:#prod=100:100 reduction=1.0 cost/cons=9.0 cost/prod=9.0 total-cost=900.0 
cardinality=100
+  |  cost-total=900 max-instances=1 adj-instances=1 cost/inst=900 
#cons:#prod=100:100 reduction=1.0 cost/cons=9.0 cost/prod=9.0 raw-cost=900.0 
cardinality=100
   |
   11:MERGING-EXCHANGE [UNPARTITIONED]
      order by: s_store_name ASC, s_store_id ASC, sum(CASE WHEN (d_day_name = 
'Sunday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 
'Monday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 
'Tuesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 
'Wednesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name 
= 'Thursday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name 
= 'Friday') THEN ss_s [...]
      limit: 100
      mem-estimate=157.71KB mem-reservation=0B thread-reservation=0
-     cost-total=56 max-instances=1 adj-instances=1 cost/inst=56 
#cons:#prod=100:100 reduction=1.0 cost/cons=0.56 cost/prod=0.56 
total-cost=56.476
+     cost-total=56 max-instances=1 adj-instances=1 cost/inst=56 
#cons:#prod=100:100 reduction=1.0 cost/cons=0.56 cost/prod=0.56 raw-cost=56.476
      tuple-ids=4 row-size=156B cardinality=100 cost=56
      in pipelines: 06(GETNEXT)
 
 F03:PLAN FRAGMENT [HASH(s_store_name,s_store_id)] hosts=10 instances=10 
(adjusted from 120)
 Per-Instance Resources: mem-estimate=28.84MB mem-reservation=1.94MB 
thread-reservation=1
 max-parallelism=10 segment-costs=[48483, 48650, 439] cpu-comparison-result=120 
[max(10 (self) vs 120 (sum children))]
-cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 
#cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 
total-cost=439.9666
-cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 
#cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 
total-cost=48650.03791799733
+cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 
#cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 
raw-cost=439.9666
+cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 
#cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 
raw-cost=48650.03791799733
 SumCost: cost-total=48483 max-instances=1 adj-instances=10 cost/inst=4849 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=7.150885 cost/prod=7.150885
   DATASTREAM SINK [FRAGMENT=F04, EXCHANGE=11, UNPARTITIONED]
   |  mem-estimate=639.76KB mem-reservation=0B thread-reservation=0 cost=439
-  |  cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 
#cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 
total-cost=439.9666
+  |  cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 
#cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 
raw-cost=439.9666
   06:TOP-N [LIMIT=100]
   |  order by: s_store_name ASC, s_store_id ASC, sum(CASE WHEN (d_day_name = 
'Sunday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 
'Monday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 
'Tuesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 
'Wednesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name 
= 'Thursday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name 
= 'Friday') THEN ss_s [...]
   |  mem-estimate=15.23KB mem-reservation=0B thread-reservation=0
-  |  cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 
#cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 
total-cost=48650.03791799733
+  |  cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 
#cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 
raw-cost=48650.03791799733
   |  tuple-ids=4 row-size=156B cardinality=100 cost=48650
   |  in pipelines: 06(GETNEXT), 10(OPEN)
   |
@@ -66,7 +66,7 @@ SumCost: cost-total=48483 max-instances=1 adj-instances=10 
cost/inst=4849 #cons:
   |
   09:EXCHANGE [HASH(s_store_name,s_store_id)]
      mem-estimate=18.84MB mem-reservation=0B thread-reservation=0
-     cost-total=5616 max-instances=1 adj-instances=10 cost/inst=562 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=0.8283186 cost/prod=0.8283186 
total-cost=5616.677
+     cost-total=5616 max-instances=1 adj-instances=10 cost/inst=562 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=0.8283186 cost/prod=0.8283186 
raw-cost=5616.677
      tuple-ids=3 row-size=156B cardinality=6.78K cost=5616
      in pipelines: 01(GETNEXT)
 
@@ -74,11 +74,11 @@ F00:PLAN FRAGMENT [RANDOM] hosts=10 instances=120
 Per-Host Shared Resources: mem-estimate=2.00MB mem-reservation=2.00MB 
thread-reservation=0 runtime-filters-memory=2.00MB
 Per-Instance Resources: mem-estimate=32.25MB mem-reservation=11.00MB 
thread-reservation=1
 max-parallelism=330 segment-costs=[3290070641, 71580] 
cpu-comparison-result=120 [max(120 (self) vs 22 (sum children))]
-cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 
total-cost=71580.922
+cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 
raw-cost=71580.922
 SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 
cost/inst=27417254 #cons:#prod=8639935193:888242617 reduction=9.726999 
cost/cons=0.38079804 cost/prod=3.7040224
   DATASTREAM SINK [FRAGMENT=F03, EXCHANGE=09, HASH(s_store_name,s_store_id)]
   |  mem-estimate=6.25MB mem-reservation=0B thread-reservation=0 cost=71580
-  |  cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 
total-cost=71580.922
+  |  cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 
#cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 
raw-cost=71580.922
   05:AGGREGATE [STREAMING]
   |  output: sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE 
NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL 
END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL 
END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL 
END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL 
END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), 
sum(CASE WHEN (d_day_name = ' [...]
   |  group by: s_store_name, s_store_id
@@ -92,7 +92,7 @@ SumCost: cost-total=3290070641 max-instances=330 
adj-instances=120 cost/inst=274
   |  hash predicates: ss_store_sk = s_store_sk
   |  fk/pk conjuncts: ss_store_sk = s_store_sk
   |  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB 
thread-reservation=0
-  |  cost-total=614141162 max-instances=62 adj-instances=120 cost/inst=5117843 
#cons:#prod=1766829189:888242617 reduction=1.9891291 cost/cons=0.3475951 
cost/prod=0.6914115 total-cost=6.141411629949E8
+  |  cost-total=614141162 max-instances=62 adj-instances=120 cost/inst=5117843 
#cons:#prod=1766829189:888242617 reduction=1.9891291 cost/cons=0.3475951 
cost/prod=0.6914115 raw-cost=6.141411629949E8
   |  tuple-ids=1,0,2 row-size=91B cardinality=888.24M cost=614141162
   |  in pipelines: 01(GETNEXT), 02(OPEN)
   |
@@ -101,7 +101,7 @@ SumCost: cost-total=3290070641 max-instances=330 
adj-instances=120 cost/inst=274
   |  hash predicates: ss_sold_date_sk = d_date_sk
   |  fk/pk conjuncts: ss_sold_date_sk = d_date_sk
   |  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB 
thread-reservation=0
-  |  cost-total=773340988 max-instances=78 adj-instances=120 cost/inst=6444509 
#cons:#prod=8639935566:1766828853 reduction=4.890081 cost/cons=0.08950773 
cost/prod=0.4377 total-cost=7.733409889581001E8
+  |  cost-total=773340988 max-instances=78 adj-instances=120 cost/inst=6444509 
#cons:#prod=8639935566:1766828853 reduction=4.890081 cost/cons=0.08950773 
cost/prod=0.4377 raw-cost=7.733409889581001E8
   |  tuple-ids=1,0 row-size=39B cardinality=1.77G cost=773340988
   |  in pipelines: 01(GETNEXT), 00(OPEN)
   |
@@ -115,7 +115,7 @@ SumCost: cost-total=3290070641 max-instances=330 
adj-instances=120 cost/inst=274
      extrapolated-rows=disabled max-scan-range-rows=390.22M 
est-scan-range=374(filtered from 1824)
      file formats: [PARQUET]
      mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=0
-     cost-total=305308025 max-instances=31 adj-instances=120 cost/inst=2544234 
#cons:#prod=8639935193:8639935193 reduction=1.0 cost/cons=0.03533684 
cost/prod=0.03533684 total-cost=3.053080257984E8
+     cost-total=305308025 max-instances=31 adj-instances=120 cost/inst=2544234 
#cons:#prod=8639935193:8639935193 reduction=1.0 cost/cons=0.03533684 
cost/prod=0.03533684 raw-cost=3.053080257984E8
      tuple-ids=1 row-size=12B cardinality=1.77G(filtered from 8.64G) 
cost=305308025
      in pipelines: 01(GETNEXT)
 
@@ -128,7 +128,7 @@ SumCost: cost-total=776 max-instances=1 adj-instances=10 
cost/inst=78 #cons:#pro
   |  build expressions: s_store_sk
   |  runtime filters: RF000[bloom] <- s_store_sk, RF001[min_max] <- s_store_sk
   |  mem-estimate=23.25MB mem-reservation=23.25MB spill-buffer=64.00KB 
thread-reservation=0 cost=336
-  |  cost-total=336 max-instances=1 adj-instances=10 cost/inst=34 
#cons:#prod=336:336 reduction=1.0 cost/cons=1.0 cost/prod=1.0 total-cost=336.0
+  |  cost-total=336 max-instances=1 adj-instances=10 cost/inst=34 
#cons:#prod=336:336 reduction=1.0 cost/cons=1.0 cost/prod=1.0 raw-cost=336.0
   |
   08:EXCHANGE [BROADCAST]
      mem-estimate=35.40KB mem-reservation=0B thread-reservation=0
@@ -142,7 +142,7 @@ max-parallelism=1 segment-costs=[635]
 SumCost: cost-total=635 max-instances=1 adj-instances=1 cost/inst=635 
#cons:#prod=1350:336 reduction=4.017857 cost/cons=0.47037038 cost/prod=1.8898809
   DATASTREAM SINK [FRAGMENT=F05, EXCHANGE=08, BROADCAST]
   |  mem-estimate=223.76KB mem-reservation=0B thread-reservation=0 cost=54
-  |  cost-total=54 max-instances=1 adj-instances=1 cost/inst=54 
#cons:#prod=336:336 reduction=1.0 cost/cons=0.16071428 cost/prod=0.16071428 
total-cost=54.3753
+  |  cost-total=54 max-instances=1 adj-instances=1 cost/inst=54 
#cons:#prod=336:336 reduction=1.0 cost/cons=0.16071428 cost/prod=0.16071428 
raw-cost=54.3753
   02:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
      HDFS partitions=1/1 files=1 size=9.81KB
      predicates: s_gmt_offset = CAST(-6 AS DECIMAL(3,0))
@@ -154,7 +154,7 @@ SumCost: cost-total=635 max-instances=1 adj-instances=1 
cost/inst=635 #cons:#pro
      parquet dictionary predicates: s_gmt_offset = CAST(-6 AS DECIMAL(3,0))
      file formats: [PARQUET]
      mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
-     cost-total=581 max-instances=1 adj-instances=1 cost/inst=581 
#cons:#prod=1350:336 reduction=4.017857 cost/cons=0.43037036 
cost/prod=1.7291666 total-cost=581.0742
+     cost-total=581 max-instances=1 adj-instances=1 cost/inst=581 
#cons:#prod=1350:336 reduction=4.017857 cost/cons=0.43037036 
cost/prod=1.7291666 raw-cost=581.0742
      tuple-ids=2 row-size=52B cardinality=336 cost=581
      in pipelines: 02(GETNEXT)
 
@@ -167,7 +167,7 @@ SumCost: cost-total=863 max-instances=1 adj-instances=10 
cost/inst=87 #cons:#pro
   |  build expressions: d_date_sk
   |  runtime filters: RF002[bloom] <- d_date_sk
   |  mem-estimate=23.25MB mem-reservation=23.25MB spill-buffer=64.00KB 
thread-reservation=0 cost=373
-  |  cost-total=373 max-instances=1 adj-instances=10 cost/inst=38 
#cons:#prod=373:373 reduction=1.0 cost/cons=1.0 cost/prod=1.0 total-cost=373.0
+  |  cost-total=373 max-instances=1 adj-instances=10 cost/inst=38 
#cons:#prod=373:373 reduction=1.0 cost/cons=1.0 cost/prod=1.0 raw-cost=373.0
   |
   07:EXCHANGE [BROADCAST]
      mem-estimate=21.23KB mem-reservation=0B thread-reservation=0
@@ -181,7 +181,7 @@ max-parallelism=1 segment-costs=[18016]
 SumCost: cost-total=18016 max-instances=1 adj-instances=1 cost/inst=18016 
#cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24662897 
cost/prod=48.300266
   DATASTREAM SINK [FRAGMENT=F06, EXCHANGE=07, BROADCAST]
   |  mem-estimate=124.57KB mem-reservation=0B thread-reservation=0 cost=35
-  |  cost-total=35 max-instances=1 adj-instances=1 cost/inst=35 
#cons:#prod=373:373 reduction=1.0 cost/cons=0.09383378 cost/prod=0.09383378 
total-cost=35.391600000000004
+  |  cost-total=35 max-instances=1 adj-instances=1 cost/inst=35 
#cons:#prod=373:373 reduction=1.0 cost/cons=0.09383378 cost/prod=0.09383378 
raw-cost=35.391600000000004
   00:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
      HDFS partitions=1/1 files=1 size=2.17MB
      predicates: d_year = CAST(1999 AS INT)
@@ -193,7 +193,7 @@ SumCost: cost-total=18016 max-instances=1 adj-instances=1 
cost/inst=18016 #cons:
      parquet dictionary predicates: d_year = CAST(1999 AS INT)
      file formats: [PARQUET]
      mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
-     cost-total=17981 max-instances=1 adj-instances=1 cost/inst=17981 
#cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24614984 
cost/prod=48.206436 total-cost=17981.5393
+     cost-total=17981 max-instances=1 adj-instances=1 cost/inst=17981 
#cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24614984 
cost/prod=48.206436 raw-cost=17981.5393
      tuple-ids=0 row-size=27B cardinality=373 cost=17981
      in pipelines: 00(GETNEXT)
 ====

Reply via email to