This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5d1bd80623324f829aca604b25d97ace21f51417 Author: Riza Suminto <[email protected]> AuthorDate: Tue Jun 11 09:50:15 2024 -0700 IMPALA-13152: Avoid NaN, infinite, and negative ProcessingCost TOP-N cost will turn into NaN if inputCardinality is equal to 0 due to Math.log(inputCardinality). This patch fix the issue by avoiding Math.log(0) and replace it with 0 instead. After this patch, Instantiating BaseProcessingCost with NaN, infinite, or negative totalCost will throw IllegalArgumentException. In BaseProcessingCost.getDetails(), "total-cost" is renamed to "raw-cost" to avoid confusion with "cost-total" in ProcessingCost.getDetails(). Testing: - Add testcase that run TOP-N query over empty table. - Compute ProcessingCost in most FE and EE test even when COMPUTE_PROCESSING_COST option is not enabled by checking if RuntimeEnv.INSTANCE.isTestEnv() is True or TEST_REPLAN option is enabled. - Pass core test. Change-Id: Ib49c7ae397dadcb2cb69fde1850d442d33cdf177 Reviewed-on: http://gerrit.cloudera.org:8080/21504 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- common/thrift/ImpalaService.thrift | 4 +- .../org/apache/impala/analysis/AggregateInfo.java | 4 ++ .../org/apache/impala/planner/AggregationNode.java | 10 +-- .../apache/impala/planner/BaseProcessingCost.java | 8 ++- .../impala/planner/BroadcastProcessingCost.java | 2 +- .../org/apache/impala/planner/CostingSegment.java | 3 +- .../java/org/apache/impala/planner/DataSink.java | 6 +- .../org/apache/impala/planner/DataStreamSink.java | 2 +- .../apache/impala/planner/DistributedPlanner.java | 1 - .../org/apache/impala/planner/ExchangeNode.java | 2 +- .../org/apache/impala/planner/HashJoinNode.java | 9 +-- .../org/apache/impala/planner/HdfsScanNode.java | 3 +- .../org/apache/impala/planner/HdfsTableSink.java | 2 +- .../apache/impala/planner/IcebergDeleteNode.java | 6 +- .../java/org/apache/impala/planner/JoinNode.java | 6 +- .../apache/impala/planner/NestedLoopJoinNode.java | 2 +- .../org/apache/impala/planner/PlanFragment.java | 2 +- .../java/org/apache/impala/planner/PlanNode.java | 4 +- .../org/apache/impala/planner/PlanRootSink.java | 6 +- .../java/org/apache/impala/planner/Planner.java | 24 ++++--- .../org/apache/impala/planner/ProcessingCost.java | 20 ++++-- .../impala/planner/ScaledProcessingCost.java | 3 +- .../java/org/apache/impala/planner/ScanNode.java | 5 +- .../java/org/apache/impala/planner/SortNode.java | 9 +-- .../apache/impala/planner/SumProcessingCost.java | 6 +- .../java/org/apache/impala/planner/UnionNode.java | 2 +- .../org/apache/impala/analysis/ExprNdvTest.java | 12 ++++ .../org/apache/impala/planner/CardinalityTest.java | 17 +++++ .../queries/PlannerTest/tpcds-processing-cost.test | 78 ++++++++++++++++++++++ .../tpcds_cpu_cost/tpcds-q43-verbose.test | 36 +++++----- 30 files changed, 218 insertions(+), 76 deletions(-) diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index fdf7710dc..3884d2fd1 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -736,7 +736,9 @@ enum TImpalaQueryOptions { // If true, replanning is enabled. ENABLE_REPLAN = 143; - // If true, test replan by imposing artificial two executor groups in FE. + // If true, test replan by imposing artificial two executor groups in FE and always + // compute ProcessingCost. The degree of parallelism adjustment, however, still require + // COMPUTE_PROCESSING_COST option set to true. TEST_REPLAN = 144; // Maximum wait time on HMS ACID lock in seconds. diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java index 1eb5cc161..25cd8ebb8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java +++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java @@ -734,6 +734,10 @@ public class AggregateInfo extends AggregateInfoBase { public ProcessingCost computeProcessingCost( String label, long inputCardinality, long intermediateOutputCardinality) { + Preconditions.checkArgument( + inputCardinality >= 0, "inputCardinality should not be negative!"); + Preconditions.checkArgument(intermediateOutputCardinality >= 0, + "intermediateOutputCardinality should not be negative!"); // Benchmarking suggests we can estimate the processing cost as a linear function // based the probe input cardinality, the "intermediate" output cardinality, and // an incremental cost per input row for each additional aggregate function. diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index c48e8fbba..aa0a5ce5c 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -575,13 +575,15 @@ public class AggregationNode extends PlanNode { // duplicate keys across fragments. Calculate an overall "intermediate" output // cardinality that attempts to account for the dups. Cap it at the input // cardinality because an aggregation cannot increase the cardinality. - long inputCardinality = getAggClassNumGroup(prevAgg, aggInfo); + long inputCardinality = Math.max(0, getAggClassNumGroup(prevAgg, aggInfo)); long perInstanceNdv = fragment_.getPerInstanceNdvForCpuCosting( inputCardinality, aggInfo.getGroupingExprs()); - long intermediateOutputCardinality = Math.min( - inputCardinality, perInstanceNdv * fragment_.getNumInstancesForCosting()); + long intermediateOutputCardinality = Math.max(0, + Math.min( + inputCardinality, perInstanceNdv * fragment_.getNumInstancesForCosting())); + long aggClassNumGroup = Math.max(0, getAggClassNumGroup(prevAgg, aggInfo)); ProcessingCost aggCost = aggInfo.computeProcessingCost(getDisplayLabel(), - getAggClassNumGroup(prevAgg, aggInfo), intermediateOutputCardinality); + aggClassNumGroup, intermediateOutputCardinality); processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost); } } diff --git a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java index 104205986..67d189da9 100644 --- a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java +++ b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java @@ -17,6 +17,8 @@ package org.apache.impala.planner; +import com.google.common.base.Preconditions; + /** * A basic implementation of {@link ProcessingCost} that takes account expression cost * and average row size as per-row costing weight. @@ -42,6 +44,10 @@ public class BaseProcessingCost extends ProcessingCost { } public BaseProcessingCost(double totalCost) { + Preconditions.checkArgument(!Double.isNaN(totalCost), "totalCost must not be a NaN!"); + Preconditions.checkArgument( + Double.isFinite(totalCost), "totalCost must be a finite double!"); + Preconditions.checkArgument(totalCost >= 0, "totalCost must not be a negative!"); cardinality_ = 0L; exprsCost_ = 0.0F; materializationCost_ = 0.0F; @@ -74,7 +80,7 @@ public class BaseProcessingCost extends ProcessingCost { public String getDetails() { StringBuilder output = new StringBuilder(); output.append(super.getDetails()); - output.append(" total-cost=").append(totalCost_); + output.append(" raw-cost=").append(totalCost_); if (cardinality_ != 0L) { output.append(" cardinality=").append(cardinality_); } return output.toString(); } diff --git a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java index d031ecea6..6f214c1ef 100644 --- a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java +++ b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java @@ -36,7 +36,7 @@ public class BroadcastProcessingCost extends ProcessingCost { protected BroadcastProcessingCost( ProcessingCost cost, Supplier<Integer> countSupplier) { Preconditions.checkArgument( - cost.isValid(), "BroadcastProcessingCost: cost is invalid!"); + cost.isValid(), "BroadcastProcessingCost: cost is invalid! %s", cost); childProcessingCost_ = cost; setNumInstanceExpected(countSupplier); } diff --git a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java index 634eea302..e7c884be2 100644 --- a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java +++ b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java @@ -120,7 +120,8 @@ public class CostingSegment extends TreeNode<CostingSegment> { } private void appendCost(ProcessingCost additionalCost) { - Preconditions.checkArgument(additionalCost.isValid()); + Preconditions.checkArgument( + additionalCost.isValid(), "additionalCost is invalid! %s", additionalCost); ProcessingCost newTotalCost = ProcessingCost.sumCost(additionalCost, cost_); newTotalCost.setNumRowToConsume(cost_.getNumRowToConsume()); newTotalCost.setNumRowToProduce(additionalCost.getNumRowToConsume()); diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java b/fe/src/main/java/org/apache/impala/planner/DataSink.java index 2cda96a02..97571207d 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java @@ -138,9 +138,9 @@ public abstract class DataSink { */ public void computeRowConsumptionAndProductionToCost() { Preconditions.checkState(processingCost_.isValid(), - "Processing cost of DataSink " + fragment_.getId() + ":" + getLabel() - + " is invalid!"); - long inputOutputCardinality = fragment_.getPlanRoot().getCardinality(); + "Processing cost of DataSink %s:%s is invalid! %s", fragment_.getId(), getLabel(), + processingCost_); + long inputOutputCardinality = Math.max(0, fragment_.getPlanRoot().getCardinality()); processingCost_.setNumRowToConsume(inputOutputCardinality); processingCost_.setNumRowToProduce(inputOutputCardinality); } diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java index 29edd9e7a..5ea939a7c 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java @@ -100,7 +100,7 @@ public class DataStreamSink extends DataSink { public void computeProcessingCost(TQueryOptions queryOptions) { // The sending part of the processing cost for the exchange node. - long outputCardinality = exchNode_.getFilteredCardinality(); + long outputCardinality = Math.max(0, exchNode_.getFilteredCardinality()); long outputSize = (long) (exchNode_.getAvgDeserializedRowSize() * outputCardinality); double totalCost = 0.0; String exchType; diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index e984b8615..42da9621e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -20,7 +20,6 @@ package org.apache.impala.planner; import java.util.ArrayList; import java.util.List; -import org.apache.impala.analysis.AnalysisContext; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.DmlStatementBase; diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index cde1d2d49..9f24c5809 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -253,7 +253,7 @@ public class ExchangeNode extends PlanNode { // bottom sending fragment; // 2. The receiving processing cost in the top receiving fragment which is computed // here. - long inputCardinality = getChild(0).getFilteredCardinality(); + long inputCardinality = Math.max(0, getChild(0).getFilteredCardinality()); // It's not obvious whether the per-byte CPU costs are more accurately estimated // using the serialized or deserialized sizes, but the coefficients were determined diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java index 766e6fac7..96cae6785 100644 --- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java @@ -339,12 +339,13 @@ public class HashJoinNode extends JoinNode { // Compute the processing cost for lhs. Benchmarking suggests we can estimate the // probe cost as a linear function combining the probe input cardinality and the // estimated output cardinality. + long outputCardinality = Math.max(0, getCardinality()); double totalProbeCost = (getProbeCardinalityForCosting() * COST_COEFFICIENT_PROBE_INPUT) - + (getCardinality() * COST_COEFFICIENT_HASH_JOIN_OUTPUT); + + (outputCardinality * COST_COEFFICIENT_HASH_JOIN_OUTPUT); if (LOG.isTraceEnabled()) { LOG.trace("Probe CPU cost estimate: " + totalProbeCost + ", Input Card: " - + getProbeCardinalityForCosting() + ", Output Card: " + getCardinality()); + + getProbeCardinalityForCosting() + ", Output Card: " + outputCardinality); } ProcessingCost probeProcessingCost = ProcessingCost.basicCost(getDisplayLabel(), totalProbeCost); @@ -357,8 +358,8 @@ public class HashJoinNode extends JoinNode { // build fragment count is fixed for broadcast(at num hosts) regardless of the cost // computed here. But we should clean up the costing here to avoid any future // confusion. - double totalBuildCost = - getChild(1).getFilteredCardinality() * COST_COEFFICIENT_BUILD_INPUT; + long buildCardinality = Math.max(0, getChild(1).getFilteredCardinality()); + double totalBuildCost = buildCardinality * COST_COEFFICIENT_BUILD_INPUT; ProcessingCost buildProcessingCost = ProcessingCost.basicCost(getDisplayLabel() + " Build side", totalBuildCost); return Pair.create(probeProcessingCost, buildProcessingCost); diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 23287e176..593168bc6 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -2228,8 +2228,6 @@ public class HdfsScanNode extends ScanNode { */ @Override protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) { - Preconditions.checkArgument(queryOptions.isCompute_processing_cost()); - long inputCardinality = getFilteredInputCardinality(); long estBytes = 0L; double bytesCostCoefficient = 0.0; @@ -2255,6 +2253,7 @@ public class HdfsScanNode extends ScanNode { } return ProcessingCost.basicCost(getDisplayLabel(), totalCost); } else { + // Input cardinality is 0 or unknown. Fallback to superclass. return super.computeScanProcessingCost(queryOptions); } } diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index d6460ae88..5ba8e8f85 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -158,7 +158,7 @@ public class HdfsTableSink extends TableSink { @Override public void computeProcessingCost(TQueryOptions queryOptions) { PlanNode inputNode = fragment_.getPlanRoot(); - long cardinality = inputNode.getCardinality(); + long cardinality = Math.max(0, inputNode.getCardinality()); float avgRowDataSize = inputNode.getAvgRowSizeWithoutPad(); long estBytesInserted = (long) Math.ceil(avgRowDataSize * (double) cardinality); double totalCost = 0.0F; diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java index 2da88274b..5830d5927 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java @@ -228,9 +228,9 @@ public class IcebergDeleteNode extends JoinNode { getProbeCardinalityForCosting(), eqJoinPredicateEvalCost); // Compute the processing cost for rhs. - ProcessingCost buildProcessingCost = - ProcessingCost.basicCost(getDisplayLabel() + " Build side", - getChild(1).getCardinality(), eqJoinPredicateEvalCost); + long buildCardinality = Math.max(0, getChild(1).getCardinality()); + ProcessingCost buildProcessingCost = ProcessingCost.basicCost( + getDisplayLabel() + " Build side", buildCardinality, eqJoinPredicateEvalCost); return Pair.create(probeProcessingCost, buildProcessingCost); } } diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index ab9f05227..c1b95953f 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -1004,8 +1004,12 @@ public abstract class JoinNode extends PlanNode { */ public abstract Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost(); + /** + * Get filtered cardinality of probe hand of join node. + * Sanitized unknown cardinality (-1) into 0. + */ protected long getProbeCardinalityForCosting() { - return getChild(0).getFilteredCardinality(); + return Math.max(0, getChild(0).getFilteredCardinality()); } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java index 414489e10..4bad6aee0 100644 --- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java @@ -111,7 +111,7 @@ public class NestedLoopJoinNode extends JoinNode { // different costs here based on that RHS threshold. // We return the full cost in the first element of the Pair. long probeCardinality = getProbeCardinalityForCosting(); - long buildCardinality = getChild(1).getCardinality(); + long buildCardinality = Math.max(0, getChild(1).getCardinality()); long cardProduct = checkedMultiply(probeCardinality, buildCardinality); long perInstanceBuildCardinality = (long) Math.ceil(buildCardinality / fragment_.getNumInstancesForCosting()); diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index 1809b6772..1c7f68a5f 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -1101,7 +1101,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { List<CostingSegment> costingSegments = rootSegment_.getNodesPreOrder(); for (CostingSegment costingSegment : costingSegments) { ProcessingCost cost = costingSegment.getProcessingCost(); - Preconditions.checkState(cost.isValid()); + Preconditions.checkState(cost.isValid(), "Segment cost is invalid! %s", cost); Preconditions.checkState( cost.getNumInstancesExpected() == getAdjustedInstanceCount()); } diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 842e024d8..62c1a108d 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -1045,7 +1045,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> { */ public void computeRowConsumptionAndProductionToCost() { Preconditions.checkState(processingCost_.isValid(), - "Processing cost of PlanNode " + getDisplayLabel() + " is invalid!"); + "Processing cost of PlanNode %s is invalid! %s", getDisplayLabel(), + processingCost_); processingCost_.setNumRowToConsume(getInputCardinality()); processingCost_.setNumRowToProduce(getCardinality()); } @@ -1279,6 +1280,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { filteredCardinality_ = newCardinality; } + // May return -1. // TODO: merge this with getCardinality(). protected long getFilteredCardinality() { return filteredCardinality_ > -1 ? filteredCardinality_ : getCardinality(); diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java index 8da761a32..df4784935 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java @@ -73,9 +73,9 @@ public class PlanRootSink extends DataSink { if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0 && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) { // The processing cost to buffer these many rows in root. - processingCost_ = - ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(), - ExprUtil.computeExprsTotalCost(outputExprs_)); + long outputCardinality = Math.max(0, fragment_.getPlanRoot().getCardinality()); + processingCost_ = ProcessingCost.basicCost( + getLabel(), outputCardinality, ExprUtil.computeExprsTotalCost(outputExprs_)); } else { processingCost_ = ProcessingCost.zero(); } diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 29685f924..a66caf1f3 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -550,15 +550,23 @@ public class Planner { public static void computeProcessingCost( List<PlanFragment> planRoots, TQueryExecRequest request, PlannerContext planCtx) { Analyzer rootAnalyzer = planCtx.getRootAnalyzer(); - if (!rootAnalyzer.getQueryOptions().isCompute_processing_cost()) { - request.setCores_required(-1); - return; - } + TQueryOptions queryOptions = rootAnalyzer.getQueryOptions(); PlanFragment rootFragment = planRoots.get(0); - List<PlanFragment> postOrderFragments = rootFragment.getNodesPostOrder(); - for (PlanFragment fragment : postOrderFragments) { - fragment.computeCostingSegment(rootAnalyzer.getQueryOptions()); + List<PlanFragment> postOrderFragments = new ArrayList<>(); + boolean testCostCalculation = queryOptions.isEnable_replan() + && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan()); + if (queryOptions.isCompute_processing_cost() || testCostCalculation) { + postOrderFragments = rootFragment.getNodesPostOrder(); + for (PlanFragment fragment : postOrderFragments) { + fragment.computeCostingSegment(queryOptions); + } + } + + // Only do parallelism adjustment if COMPUTE_PROCESSING_COST is enabled. + if (!queryOptions.isCompute_processing_cost()) { + request.setCores_required(-1); + return; } if (LOG.isTraceEnabled()) { @@ -571,7 +579,7 @@ public class Planner { computeEffectiveParallelism(postOrderFragments, rootAnalyzer.getMinParallelismPerNode(), rootAnalyzer.getMaxParallelismPerNode(), - rootAnalyzer.getQueryOptions()); + queryOptions); // Count bounded core count. This is taken from final instance count from previous // step. diff --git a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java index c929b6166..e27a610b9 100644 --- a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java +++ b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java @@ -80,10 +80,6 @@ public abstract class ProcessingCost implements Cloneable { Math.max(0, cardinality), exprsCost, materializationCost); } - private static ProcessingCost computeValidBaseCost(double totalCost) { - return new BaseProcessingCost(totalCost); - } - public static ProcessingCost basicCost( String label, long cardinality, float exprsCost, float materializationCost) { ProcessingCost processingCost = @@ -99,10 +95,20 @@ public abstract class ProcessingCost implements Cloneable { return processingCost; } + /** + * Create new ProcessingCost. + * 'totalCost' must not be a negative, NaN, or infinite. + */ public static ProcessingCost basicCost(String label, double totalCost) { - ProcessingCost processingCost = computeValidBaseCost(totalCost); - processingCost.setLabel(label); - return processingCost; + try { + ProcessingCost processingCost = new BaseProcessingCost(totalCost); + processingCost.setLabel(label); + return processingCost; + } catch (IllegalArgumentException ex) { + // Rethrow with label mentioned in the exception message. + throw new IllegalArgumentException( + String.format("Invalid totalCost supplied for %s", label), ex); + } } /** diff --git a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java index 421ee9e64..9c7e14465 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java +++ b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java @@ -26,7 +26,8 @@ public class ScaledProcessingCost extends ProcessingCost { private final long multiplier_; protected ScaledProcessingCost(ProcessingCost cost, long multiplier) { - Preconditions.checkArgument(cost.isValid(), "ScaledProcessingCost: cost is invalid!"); + Preconditions.checkArgument( + cost.isValid(), "ScaledProcessingCost: cost is invalid! %s", cost); Preconditions.checkArgument( multiplier >= 0, "ScaledProcessingCost: multiplier must be non-negative!"); cost_ = cost; diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index 08d6c3129..9cb78b67e 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -345,6 +345,7 @@ abstract public class ScanNode extends PlanNode { return capInputCardinalityWithLimit(inputCardinality_); } + // May return -1. // TODO: merge this with getInputCardinality(). public long getFilteredInputCardinality() { return capInputCardinalityWithLimit( @@ -396,8 +397,6 @@ abstract public class ScanNode extends PlanNode { * number of scan ranges and related query options. */ protected int computeMaxScannerThreadsForCPC(TQueryOptions queryOptions) { - Preconditions.checkArgument(queryOptions.isCompute_processing_cost()); - // maxThread calculation below intentionally does not include core count from // executor group config. This is to allow scan fragment parallelism to scale // regardless of the core count limit. @@ -417,8 +416,6 @@ abstract public class ScanNode extends PlanNode { * the return value of this method. */ protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) { - Preconditions.checkArgument(queryOptions.isCompute_processing_cost()); - int maxScannerThreads = computeMaxScannerThreadsForCPC(queryOptions); long inputCardinality = getFilteredInputCardinality(); diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index b82c24c8e..4d120818e 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -473,10 +473,12 @@ public class SortNode extends PlanNode { // TODO: Benchmark partial sort cost separately. // TODO: Improve this for larger spilling sorts. double totalCost = 0.0F; - long inputCardinality = getChild(0).getFilteredCardinality(); + long inputCardinality = Math.max(0, getChild(0).getFilteredCardinality()); + double log2InputCardinality = + inputCardinality <= 0 ? 0.0 : (Math.log(inputCardinality) / Math.log(2)); if (type_ == TSortType.TOTAL || type_ == TSortType.PARTIAL) { if (avgRowSize_ <= 10) { - totalCost = inputCardinality * (Math.log(inputCardinality) / Math.log(2)) + totalCost = inputCardinality * log2InputCardinality * COST_COEFFICIENT_SORT_TOTAL_SMALL_ROW; } else { double fullInputSize = inputCardinality * avgRowSize_; @@ -487,8 +489,7 @@ public class SortNode extends PlanNode { Preconditions.checkState( type_ == TSortType.TOPN || type_ == TSortType.PARTITIONED_TOPN); // Benchmarked TopN sort costs were ~ NlogN rows. - totalCost = inputCardinality * (Math.log(inputCardinality) / Math.log(2)) - * COST_COEFFICIENT_SORT_TOPN; + totalCost = inputCardinality * log2InputCardinality * COST_COEFFICIENT_SORT_TOPN; } if (LOG.isTraceEnabled()) { LOG.trace("Sort CPU cost estimate: " + totalCost + ", Type: " + type_ diff --git a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java index fcbd6dfab..f567c8696 100644 --- a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java +++ b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java @@ -26,8 +26,10 @@ public class SumProcessingCost extends ProcessingCost { private final ProcessingCost cost2_; protected SumProcessingCost(ProcessingCost cost1, ProcessingCost cost2) { - Preconditions.checkArgument(cost1.isValid(), "SumProcessingCost: cost1 is invalid!"); - Preconditions.checkArgument(cost2.isValid(), "SumProcessingCost: cost2 is invalid!"); + Preconditions.checkArgument( + cost1.isValid(), "SumProcessingCost: cost1 is invalid! %s", cost1); + Preconditions.checkArgument( + cost2.isValid(), "SumProcessingCost: cost2 is invalid! %s", cost2); cost1_ = cost1; cost2_ = cost2; } diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index 527dac825..5ff654abe 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -164,7 +164,7 @@ public class UnionNode extends PlanNode { PlanNode child = children_.get(i); if (child.cardinality_ >= 0) { totalMaterializedCardinality = - checkedAdd(totalMaterializedCardinality, child.cardinality_); + checkedAdd(totalMaterializedCardinality, Math.max(0, child.cardinality_)); } } long estBytesMaterialized = diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java index 7e8418c99..48eb56bfd 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ExprNdvTest.java @@ -20,13 +20,25 @@ package org.apache.impala.analysis; import org.apache.impala.analysis.AnalysisContext.AnalysisResult; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.RuntimeEnv; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; /** * Tests computeNumDistinctValues() estimates for Exprs */ public class ExprNdvTest extends FrontendTestBase { + @BeforeClass + public static void setUpClass() throws Exception { + RuntimeEnv.INSTANCE.setTestEnv(true); + } + + @AfterClass + public static void cleanUpClass() { + RuntimeEnv.INSTANCE.reset(); + } public void verifyNdv(String expr, long expectedNdv) throws ImpalaException { diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java index ae97fdef7..a6807c2e3 100644 --- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java +++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java @@ -18,6 +18,7 @@ package org.apache.impala.planner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Arrays; @@ -25,11 +26,14 @@ import java.util.List; import java.util.Set; import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.Frontend.PlanCtx; import org.apache.impala.testutil.TestUtils; import org.apache.impala.thrift.QueryConstants; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryOptions; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.ImmutableSet; @@ -42,6 +46,16 @@ public class CardinalityTest extends PlannerTestBase { private static double CARDINALITY_TOLERANCE = 0.05; + @BeforeClass + public static void setUpClass() throws Exception { + RuntimeEnv.INSTANCE.setTestEnv(true); + } + + @AfterClass + public static void cleanUpClass() { + RuntimeEnv.INSTANCE.reset(); + } + /** * Test the happy path: table with stats, no all-null cols. */ @@ -1117,6 +1131,9 @@ public class CardinalityTest extends PlannerTestBase { // the distributed plan). PlanNode currentNode = plan.get(plan.size() - 1).getPlanRoot(); for (Integer currentChildIndex: path) { + assertTrue(currentNode.getDisplayLabel() + " does not have child index " + + currentChildIndex, + currentNode.hasChild(currentChildIndex)); currentNode = currentNode.getChild(currentChildIndex); } assertEquals("PlanNode class not matched: ", cl.getName(), diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test index 94633f8e0..ded736b47 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test @@ -200,3 +200,81 @@ max-parallelism=1 segment-costs=[28] tuple-ids=0 row-size=12B cardinality=20 cost=3 in pipelines: 00(GETNEXT) ==== +# IMPALA-13152: Regression test for TOP-N query over empty table. +select field, rk from ( +select + field, f2, + row_number() + over (partition by field order by f2) rk +from functional.emptytable +) b +where rk = 1; +---- PARALLELPLANS +Max Per-Host Resource Reservation: Memory=48.00MB Threads=4 +Per-Host Resource Estimates: Memory=48MB +F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Instance Resources: mem-estimate=4.06MB mem-reservation=4.00MB thread-reservation=1 +| max-parallelism=1 segment-costs=[0] cpu-comparison-result=2 [max(1 (self) vs 2 (sum children))] +PLAN-ROOT SINK +| output exprs: field, row_number() +| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=0 +| +06:EXCHANGE [UNPARTITIONED] +| mem-estimate=64.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=4,3 row-size=24B cardinality=0 cost=0 +| in pipelines: 05(GETNEXT) +| +F01:PLAN FRAGMENT [HASH(field)] hosts=1 instances=2 (adjusted from 1) +Per-Instance Resources: mem-estimate=16.12MB mem-reservation=16.00MB thread-reservation=1 +max-parallelism=1 segment-costs=[0, 0] cpu-comparison-result=2 [max(2 (self) vs 1 (sum children))] +03:SELECT +| predicates: row_number() = CAST(1 AS BIGINT) +| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| tuple-ids=4,3 row-size=24B cardinality=0 cost=0 +| in pipelines: 05(GETNEXT) +| +02:ANALYTIC +| functions: row_number() +| partition by: field +| order by: f2 ASC +| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=4,3 row-size=24B cardinality=0 cost=0 +| in pipelines: 05(GETNEXT) +| +05:TOP-N +| partition by: field +| order by: f2 ASC +| partition limit: 1 +| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=4 row-size=16B cardinality=0 cost=0 +| in pipelines: 05(GETNEXT), 01(OPEN) +| +04:EXCHANGE [HASH(field)] +| mem-estimate=20.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=4 row-size=16B cardinality=0 cost=0 +| in pipelines: 01(GETNEXT) +| +F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 +Per-Instance Resources: mem-estimate=12.16MB mem-reservation=12.00MB thread-reservation=1 +max-parallelism=1 segment-costs=[0, 0] +01:TOP-N +| partition by: field +| order by: f2 ASC +| partition limit: 1 +| source expr: row_number() = CAST(1 AS BIGINT) +| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=4 row-size=16B cardinality=0 cost=0 +| in pipelines: 01(GETNEXT), 00(OPEN) +| +00:SCAN HDFS [functional.emptytable, RANDOM] + partitions=0/0 files=0 size=0B + stored statistics: + table: rows=unavailable size=unavailable + partitions: 0/0 rows=0 + columns missing stats: field + extrapolated-rows=disabled max-scan-range-rows=0 + mem-estimate=0B mem-reservation=0B thread-reservation=0 + tuple-ids=0 row-size=16B cardinality=0 cost=0 + in pipelines: 00(GETNEXT) +==== \ No newline at end of file diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test index 1c1293a91..d2c0cbe9e 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q43-verbose.test @@ -30,29 +30,29 @@ SumCost: cost-total=956 max-instances=1 adj-instances=1 cost/inst=956 #cons:#pro PLAN-ROOT SINK | output exprs: s_store_name, s_store_id, sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END [...] | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=900 - | cost-total=900 max-instances=1 adj-instances=1 cost/inst=900 #cons:#prod=100:100 reduction=1.0 cost/cons=9.0 cost/prod=9.0 total-cost=900.0 cardinality=100 + | cost-total=900 max-instances=1 adj-instances=1 cost/inst=900 #cons:#prod=100:100 reduction=1.0 cost/cons=9.0 cost/prod=9.0 raw-cost=900.0 cardinality=100 | 11:MERGING-EXCHANGE [UNPARTITIONED] order by: s_store_name ASC, s_store_id ASC, sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Friday') THEN ss_s [...] limit: 100 mem-estimate=157.71KB mem-reservation=0B thread-reservation=0 - cost-total=56 max-instances=1 adj-instances=1 cost/inst=56 #cons:#prod=100:100 reduction=1.0 cost/cons=0.56 cost/prod=0.56 total-cost=56.476 + cost-total=56 max-instances=1 adj-instances=1 cost/inst=56 #cons:#prod=100:100 reduction=1.0 cost/cons=0.56 cost/prod=0.56 raw-cost=56.476 tuple-ids=4 row-size=156B cardinality=100 cost=56 in pipelines: 06(GETNEXT) F03:PLAN FRAGMENT [HASH(s_store_name,s_store_id)] hosts=10 instances=10 (adjusted from 120) Per-Instance Resources: mem-estimate=28.84MB mem-reservation=1.94MB thread-reservation=1 max-parallelism=10 segment-costs=[48483, 48650, 439] cpu-comparison-result=120 [max(10 (self) vs 120 (sum children))] -cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 total-cost=439.9666 -cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 total-cost=48650.03791799733 +cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 raw-cost=439.9666 +cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 raw-cost=48650.03791799733 SumCost: cost-total=48483 max-instances=1 adj-instances=10 cost/inst=4849 #cons:#prod=6780:6780 reduction=1.0 cost/cons=7.150885 cost/prod=7.150885 DATASTREAM SINK [FRAGMENT=F04, EXCHANGE=11, UNPARTITIONED] | mem-estimate=639.76KB mem-reservation=0B thread-reservation=0 cost=439 - | cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 total-cost=439.9666 + | cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 raw-cost=439.9666 06:TOP-N [LIMIT=100] | order by: s_store_name ASC, s_store_id ASC, sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Friday') THEN ss_s [...] | mem-estimate=15.23KB mem-reservation=0B thread-reservation=0 - | cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 total-cost=48650.03791799733 + | cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 raw-cost=48650.03791799733 | tuple-ids=4 row-size=156B cardinality=100 cost=48650 | in pipelines: 06(GETNEXT), 10(OPEN) | @@ -66,7 +66,7 @@ SumCost: cost-total=48483 max-instances=1 adj-instances=10 cost/inst=4849 #cons: | 09:EXCHANGE [HASH(s_store_name,s_store_id)] mem-estimate=18.84MB mem-reservation=0B thread-reservation=0 - cost-total=5616 max-instances=1 adj-instances=10 cost/inst=562 #cons:#prod=6780:6780 reduction=1.0 cost/cons=0.8283186 cost/prod=0.8283186 total-cost=5616.677 + cost-total=5616 max-instances=1 adj-instances=10 cost/inst=562 #cons:#prod=6780:6780 reduction=1.0 cost/cons=0.8283186 cost/prod=0.8283186 raw-cost=5616.677 tuple-ids=3 row-size=156B cardinality=6.78K cost=5616 in pipelines: 01(GETNEXT) @@ -74,11 +74,11 @@ F00:PLAN FRAGMENT [RANDOM] hosts=10 instances=120 Per-Host Shared Resources: mem-estimate=2.00MB mem-reservation=2.00MB thread-reservation=0 runtime-filters-memory=2.00MB Per-Instance Resources: mem-estimate=32.25MB mem-reservation=11.00MB thread-reservation=1 max-parallelism=330 segment-costs=[3290070641, 71580] cpu-comparison-result=120 [max(120 (self) vs 22 (sum children))] -cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 total-cost=71580.922 +cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 raw-cost=71580.922 SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=27417254 #cons:#prod=8639935193:888242617 reduction=9.726999 cost/cons=0.38079804 cost/prod=3.7040224 DATASTREAM SINK [FRAGMENT=F03, EXCHANGE=09, HASH(s_store_name,s_store_id)] | mem-estimate=6.25MB mem-reservation=0B thread-reservation=0 cost=71580 - | cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 total-cost=71580.922 + | cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 raw-cost=71580.922 05:AGGREGATE [STREAMING] | output: sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = ' [...] | group by: s_store_name, s_store_id @@ -92,7 +92,7 @@ SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=274 | hash predicates: ss_store_sk = s_store_sk | fk/pk conjuncts: ss_store_sk = s_store_sk | mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0 - | cost-total=614141162 max-instances=62 adj-instances=120 cost/inst=5117843 #cons:#prod=1766829189:888242617 reduction=1.9891291 cost/cons=0.3475951 cost/prod=0.6914115 total-cost=6.141411629949E8 + | cost-total=614141162 max-instances=62 adj-instances=120 cost/inst=5117843 #cons:#prod=1766829189:888242617 reduction=1.9891291 cost/cons=0.3475951 cost/prod=0.6914115 raw-cost=6.141411629949E8 | tuple-ids=1,0,2 row-size=91B cardinality=888.24M cost=614141162 | in pipelines: 01(GETNEXT), 02(OPEN) | @@ -101,7 +101,7 @@ SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=274 | hash predicates: ss_sold_date_sk = d_date_sk | fk/pk conjuncts: ss_sold_date_sk = d_date_sk | mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0 - | cost-total=773340988 max-instances=78 adj-instances=120 cost/inst=6444509 #cons:#prod=8639935566:1766828853 reduction=4.890081 cost/cons=0.08950773 cost/prod=0.4377 total-cost=7.733409889581001E8 + | cost-total=773340988 max-instances=78 adj-instances=120 cost/inst=6444509 #cons:#prod=8639935566:1766828853 reduction=4.890081 cost/cons=0.08950773 cost/prod=0.4377 raw-cost=7.733409889581001E8 | tuple-ids=1,0 row-size=39B cardinality=1.77G cost=773340988 | in pipelines: 01(GETNEXT), 00(OPEN) | @@ -115,7 +115,7 @@ SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=274 extrapolated-rows=disabled max-scan-range-rows=390.22M est-scan-range=374(filtered from 1824) file formats: [PARQUET] mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=0 - cost-total=305308025 max-instances=31 adj-instances=120 cost/inst=2544234 #cons:#prod=8639935193:8639935193 reduction=1.0 cost/cons=0.03533684 cost/prod=0.03533684 total-cost=3.053080257984E8 + cost-total=305308025 max-instances=31 adj-instances=120 cost/inst=2544234 #cons:#prod=8639935193:8639935193 reduction=1.0 cost/cons=0.03533684 cost/prod=0.03533684 raw-cost=3.053080257984E8 tuple-ids=1 row-size=12B cardinality=1.77G(filtered from 8.64G) cost=305308025 in pipelines: 01(GETNEXT) @@ -128,7 +128,7 @@ SumCost: cost-total=776 max-instances=1 adj-instances=10 cost/inst=78 #cons:#pro | build expressions: s_store_sk | runtime filters: RF000[bloom] <- s_store_sk, RF001[min_max] <- s_store_sk | mem-estimate=23.25MB mem-reservation=23.25MB spill-buffer=64.00KB thread-reservation=0 cost=336 - | cost-total=336 max-instances=1 adj-instances=10 cost/inst=34 #cons:#prod=336:336 reduction=1.0 cost/cons=1.0 cost/prod=1.0 total-cost=336.0 + | cost-total=336 max-instances=1 adj-instances=10 cost/inst=34 #cons:#prod=336:336 reduction=1.0 cost/cons=1.0 cost/prod=1.0 raw-cost=336.0 | 08:EXCHANGE [BROADCAST] mem-estimate=35.40KB mem-reservation=0B thread-reservation=0 @@ -142,7 +142,7 @@ max-parallelism=1 segment-costs=[635] SumCost: cost-total=635 max-instances=1 adj-instances=1 cost/inst=635 #cons:#prod=1350:336 reduction=4.017857 cost/cons=0.47037038 cost/prod=1.8898809 DATASTREAM SINK [FRAGMENT=F05, EXCHANGE=08, BROADCAST] | mem-estimate=223.76KB mem-reservation=0B thread-reservation=0 cost=54 - | cost-total=54 max-instances=1 adj-instances=1 cost/inst=54 #cons:#prod=336:336 reduction=1.0 cost/cons=0.16071428 cost/prod=0.16071428 total-cost=54.3753 + | cost-total=54 max-instances=1 adj-instances=1 cost/inst=54 #cons:#prod=336:336 reduction=1.0 cost/cons=0.16071428 cost/prod=0.16071428 raw-cost=54.3753 02:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM] HDFS partitions=1/1 files=1 size=9.81KB predicates: s_gmt_offset = CAST(-6 AS DECIMAL(3,0)) @@ -154,7 +154,7 @@ SumCost: cost-total=635 max-instances=1 adj-instances=1 cost/inst=635 #cons:#pro parquet dictionary predicates: s_gmt_offset = CAST(-6 AS DECIMAL(3,0)) file formats: [PARQUET] mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0 - cost-total=581 max-instances=1 adj-instances=1 cost/inst=581 #cons:#prod=1350:336 reduction=4.017857 cost/cons=0.43037036 cost/prod=1.7291666 total-cost=581.0742 + cost-total=581 max-instances=1 adj-instances=1 cost/inst=581 #cons:#prod=1350:336 reduction=4.017857 cost/cons=0.43037036 cost/prod=1.7291666 raw-cost=581.0742 tuple-ids=2 row-size=52B cardinality=336 cost=581 in pipelines: 02(GETNEXT) @@ -167,7 +167,7 @@ SumCost: cost-total=863 max-instances=1 adj-instances=10 cost/inst=87 #cons:#pro | build expressions: d_date_sk | runtime filters: RF002[bloom] <- d_date_sk | mem-estimate=23.25MB mem-reservation=23.25MB spill-buffer=64.00KB thread-reservation=0 cost=373 - | cost-total=373 max-instances=1 adj-instances=10 cost/inst=38 #cons:#prod=373:373 reduction=1.0 cost/cons=1.0 cost/prod=1.0 total-cost=373.0 + | cost-total=373 max-instances=1 adj-instances=10 cost/inst=38 #cons:#prod=373:373 reduction=1.0 cost/cons=1.0 cost/prod=1.0 raw-cost=373.0 | 07:EXCHANGE [BROADCAST] mem-estimate=21.23KB mem-reservation=0B thread-reservation=0 @@ -181,7 +181,7 @@ max-parallelism=1 segment-costs=[18016] SumCost: cost-total=18016 max-instances=1 adj-instances=1 cost/inst=18016 #cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24662897 cost/prod=48.300266 DATASTREAM SINK [FRAGMENT=F06, EXCHANGE=07, BROADCAST] | mem-estimate=124.57KB mem-reservation=0B thread-reservation=0 cost=35 - | cost-total=35 max-instances=1 adj-instances=1 cost/inst=35 #cons:#prod=373:373 reduction=1.0 cost/cons=0.09383378 cost/prod=0.09383378 total-cost=35.391600000000004 + | cost-total=35 max-instances=1 adj-instances=1 cost/inst=35 #cons:#prod=373:373 reduction=1.0 cost/cons=0.09383378 cost/prod=0.09383378 raw-cost=35.391600000000004 00:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM] HDFS partitions=1/1 files=1 size=2.17MB predicates: d_year = CAST(1999 AS INT) @@ -193,7 +193,7 @@ SumCost: cost-total=18016 max-instances=1 adj-instances=1 cost/inst=18016 #cons: parquet dictionary predicates: d_year = CAST(1999 AS INT) file formats: [PARQUET] mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0 - cost-total=17981 max-instances=1 adj-instances=1 cost/inst=17981 #cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24614984 cost/prod=48.206436 total-cost=17981.5393 + cost-total=17981 max-instances=1 adj-instances=1 cost/inst=17981 #cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24614984 cost/prod=48.206436 raw-cost=17981.5393 tuple-ids=0 row-size=27B cardinality=373 cost=17981 in pipelines: 00(GETNEXT) ====
