[
https://issues.apache.org/jira/browse/IMPALA-10992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698398#comment-17698398
]
ASF subversion and git services commented on IMPALA-10992:
----------------------------------------------------------
Commit 29ad046d05869bed7489bc487636e0f64b3328aa in impala's branch
refs/heads/master from Qifan Chen
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=29ad046d0 ]
IMPALA-11604 (part 1): Model ProcessingCost for PlanNodes & DataSink
This patch augments IMPALA-10992 by establishing a model to allow the
weighted total amount of data to process to be used as a new factor in
the definition and selection of an executor group. We call this model
ProcessingCost.
ProcessingCost of a PlanNode/DataSink is a weighted amount of data
processed by that node/sink. The basic ProcessingCost is computed with a
general formula as follows.
ProcessingCost is a pair: PC(D, N), where D = I * (C + M)
where D is the weighted amount of data processed
I is the input cardinality
C is the expression evaluation cost per row.
Set to total weight of expression evaluation in node/sink.
M is a materialization cost per row.
Only used by scan and exchange node. Otherwise, 0.
N is the number of instances.
Default to D / 10000000.
In this patch, the weight of each expression evaluation is set to a
constant of 1. A description of the computation for each kind of
PlanNode/DataSink is given below.
01. AggregationNode:
Each AggregateInfo has its C as a sum of grouping expression and
aggregate expression and then assigned a single ProcessingCost
individually. These ProcessingCosts then summed to be the Aggregation
node's ProcessingCost;
02. AnalyticEvalNode:
C is the sum of the evaluation costs for analytic functions;
03. CardinalityCheckNode:
Use the general formula, I = 1;
04. DataSourceScanNode:
Follow the formula from the superclass ScanNode;
05. EmptySetNode:
I = 0;
06. ExchangeNode:
M = (average serialized row size) / 1024
A modification of the general formula when in broadcast mode:
D = D * number of receivers;
07. HashJoinNode:
probe cost = PC(I0 * C(equiJoin predicate), N) +
PC(output cardinality * C(otherJoin predicate), N)
build cost = PC(I1 * C(equi-join predicate), N)
With I0 and I1 as input cardinality of the probe and build side
accordingly. If the plan node does not have a separate build, ProcessingCost
is the sum of probe cost and build cost. Otherwise, ProcessingCost is
equal to probeCost.
08. HbaseScanNode, HdfsScanNode, and KuduScanNode:
Follow the formula from the superclass ScanNode;
09. Nested loop join node:
When the right child is not a SingularRowSrcNode:
probe cost = PC(I0 * C(equiJoin predicate), N) +
PC(output cardinality * C(otherJoin predicate), N)
build cost = PC(I1 * C(equiJoin predicate), N)
When the right child is a SingularRowSrcNode:
probe cost = PC(I0, N)
build cost = PC(I0 * I1, N)
With I0 and I1 as input cardinality of the probe and build side
accordingly. If the plan node does not have a separate build, ProcessingCost
is the sum of probe cost and build cost. Otherwise, ProcessingCost is
equal to probeCost.
10. ScanNode:
M = (average row size) / 1024;
11. SelectNode:
Use the general formula;
12. SingularRowSrcNode:
Since the node is involved once per input in nested loop join, the
contribution of this node is computed in nested loop join;
13. SortNode:
C is the evaluation cost for the sort expression;
14. SubplanNode:
C is 1. I is the multiplication of the cardinality of the left and
the right child;
15. Union node:
C is the cost of result expression evaluation from all non-pass-through
children;
16. Unnest node:
I is the cardinality of the containing SubplanNode and C is 1.
17. DataStreamSink:
M = 1 / num rows per batch.
18. JoinBuildSink:
ProcessingCost is the build cost of its associated JoinNode.
19. PlanRootSink:
If result spooling is enabled, C is the cost of output expression
evaluation. Otherwise. ProcessingCost is zero.
20. TableSink:
C is the cost of output expression evaluation.
TableSink subclasses (including HBaseTableSink, HdfsTableSink, and
KuduTableSink) follows the same formula;
Part 2 of IMPALA-11604 will implement an algorithm that tries to adjust
the number of instances for each fragment by considering their
production-consumption ratio, and then finally returns a number
representing an ideal CPU core count required for a query to run
efficiently.
Testing:
- Pass FE tests.
Co-authored-by: Riza Suminto <[email protected]>
Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
Reviewed-on: http://gerrit.cloudera.org:8080/19033
Reviewed-by: Wenzhe Zhou <[email protected]>
Reviewed-by: Kurt Deschler <[email protected]>
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Riza Suminto <[email protected]>
> Planner changes for estimate peak memory.
> -----------------------------------------
>
> Key: IMPALA-10992
> URL: https://issues.apache.org/jira/browse/IMPALA-10992
> Project: IMPALA
> Issue Type: Task
> Reporter: Amogh Margoor
> Assignee: Qifan Chen
> Priority: Critical
> Fix For: Impala 4.1.0
>
>
> For ability to run large queries on larger executor group mapping to
> different resource group, we would need to identify the large queries during
> compile time. For this identification in first phase we can use peak memory
> estimation to classify large queries. This Jira is to keep track of that
> support.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]