Mostafa Mokhtar created IMPALA-6746:
---------------------------------------
Summary: Reduce the number of comparison for analytical functions
with partitioning when incoming data is clustered
Key: IMPALA-6746
URL: https://issues.apache.org/jira/browse/IMPALA-6746
Project: IMPALA
Issue Type: Improvement
Components: Backend
Affects Versions: Impala 2.13.0
Reporter: Mostafa Mokhtar
Assignee: Tianyi Wang
Attachments: percentile query profile 2.txt
Checking if the current row belongs to the same partition in ANALYTIC is very
expensive, as it does N comparisons where N is number of rows, in cases when
the cardinality of the partition column(s) is relatively small the values will
be clustered.
One optimization as proposed by [~alex.behm] is to check the first and last
tuples in the batch and if they match go avoid calling
AnalyticEvalNode::PrevRowCompare for the entire batch.
For the query attached which is a common pattern the expected speedup is 20-30%.
Query
{code}
select l_commitdate
,avg(l_extendedprice) as avg_perc
,percentile_cont (.25) within group (order by l_extendedprice asc) as
perc_25
,percentile_cont (.5) within group (order by l_extendedprice asc) as perc_50
,percentile_cont (.75) within group (order by l_extendedprice asc) as
perc_75
,percentile_cont (.90) within group (order by l_extendedprice asc) as
perc_90
from lineitem
group by l_commitdate
order by l_commitdate
{code}
Plan
{code}
F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Host Resources: mem-estimate=0B mem-reservation=0B
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B
|
09:MERGING-EXCHANGE [UNPARTITIONED]
| order by: l_commitdate ASC
| mem-estimate=0B mem-reservation=0B
| tuple-ids=5 row-size=66B cardinality=2559
|
F02:PLAN FRAGMENT [HASH(l_commitdate)] hosts=1 instances=1
Per-Host Resources: mem-estimate=22.00MB mem-reservation=13.94MB
05:SORT
| order by: l_commitdate ASC
| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB
| tuple-ids=5 row-size=66B cardinality=2559
|
08:AGGREGATE [FINALIZE]
| output: avg:merge(l_extendedprice),
_percentile_cont_interpolation:merge(l_extendedprice,
`_percentile_row_number_diff_0`),
_percentile_cont_interpolation:merge(l_extendedprice,
`_percentile_row_number_diff_1`),
_percentile_cont_interpolation:merge(l_extendedprice,
`_percentile_row_number_diff_2`),
_percentile_cont_interpolation:merge(l_extendedprice,
`_percentile_row_number_diff_3`)
| group by: l_commitdate
| mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
| tuple-ids=4 row-size=66B cardinality=2559
|
07:EXCHANGE [HASH(l_commitdate)]
| mem-estimate=0B mem-reservation=0B
| tuple-ids=3 row-size=66B cardinality=2559
|
F01:PLAN FRAGMENT [HASH(l_commitdate)] hosts=1 instances=1
Per-Host Resources: mem-estimate=64.00MB mem-reservation=22.00MB
04:AGGREGATE [STREAMING]
| output: avg(l_extendedprice),
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 -
count(l_extendedprice) - 1 * 0.25),
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 -
count(l_extendedprice) - 1 * 0.5),
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 -
count(l_extendedprice) - 1 * 0.75),
_percentile_cont_interpolation(l_extendedprice, row_number() - 1 -
count(l_extendedprice) - 1 * 0.90)
| group by: l_commitdate
| mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
| tuple-ids=3 row-size=66B cardinality=2559
|
03:ANALYTIC
| functions: count(l_extendedprice)
| partition by: l_commitdate
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
| tuple-ids=9,7,8 row-size=50B cardinality=59986052
|
02:ANALYTIC
| functions: row_number()
| partition by: l_commitdate
| order by: l_extendedprice ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
| tuple-ids=9,7 row-size=42B cardinality=59986052
|
01:SORT
| order by: l_commitdate ASC NULLS FIRST, l_extendedprice ASC NULLS LAST
| mem-estimate=46.00MB mem-reservation=12.00MB spill-buffer=2.00MB
| tuple-ids=9 row-size=34B cardinality=59986052
|
06:EXCHANGE [HASH(l_commitdate)]
| mem-estimate=0B mem-reservation=0B
| tuple-ids=0 row-size=34B cardinality=59986052
|
F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
00:SCAN HDFS [tpch_10_parquet.lineitem, RANDOM]
partitions=1/1 files=15 size=2.05GB
stored statistics:
table: rows=59986052 size=2.05GB
columns: all
extrapolated-rows=disabled
mem-estimate=88.00MB mem-reservation=0B
tuple-ids=0 row-size=34B cardinality=59986052
{code}
Call stack
{code}
libc.so.6!__memcmp_sse4_1 - memcmp-sse4.S
impalad!StringCompare+0x14 - string-value.inline.h:40
impalad!impala::StringValue::Eq+0x23 - string-value.inline.h:62
impalad!impala::StringValue::operator==+0 - string-value.inline.h:66
impalad!impala::Operators::Eq_StringVal_StringVal+0xd - operators-ir.cc:227
impalad!impala::ScalarFnCall::InterpretEval<impala_udf::BooleanVal>+0x597 -
scalar-fn-call.cc:485
impalad!impala::ScalarFnCall::GetBooleanVal+0x24 - scalar-fn-call.cc:536
impalad!impala::AndPredicate::GetBooleanVal+0x4d - compound-predicates.cc:36
impalad!impala::OrPredicate::GetBooleanVal+0x4d - compound-predicates.cc:56
impalad!impala::AndPredicate::GetBooleanVal+0x29 - compound-predicates.cc:33
impalad!impala::ScalarExprEvaluator::GetBooleanVal+0x16 -
scalar-expr-evaluator.cc:368
impalad!impala::AnalyticEvalNode::PrevRowCompare+0xb - analytic-eval-node.cc:591
impalad!impala::AnalyticEvalNode::ProcessChildBatch+0x227 -
analytic-eval-node.cc:644
impalad!impala::AnalyticEvalNode::ProcessChildBatches+0xf5 -
analytic-eval-node.cc:604
impalad!impala::AnalyticEvalNode::GetNext+0x269 - analytic-eval-node.cc:786
impalad!impala::AnalyticEvalNode::ProcessChildBatches+0xbf -
analytic-eval-node.cc:602
impalad!impala::AnalyticEvalNode::GetNext+0x269 - analytic-eval-node.cc:786
impalad!impala::PartitionedAggregationNode::GetRowsStreaming+0xa6 -
partitioned-aggregation-node.cc:478
impalad!impala::PartitionedAggregationNode::GetNext+0x221 -
partitioned-aggregation-node.cc:369
impalad!impala::FragmentInstanceState::ExecInternal+0x1b1 -
fragment-instance-state.cc:277
impalad!impala::FragmentInstanceState::Exec+0x29e -
fragment-instance-state.cc:89
impalad!impala::QueryState::ExecFInstance+0x249 - query-state.cc:394
impalad!boost::function0<void>::operator()+0x1a - function_template.hpp:767
impalad!impala::Thread::SuperviseThread+0x2e4 - thread.cc:356
impalad!operator()<void (*)(const std::basic_string<char>&, const
std::basic_string<char>&, boost::function<void()>, const
impala::ThreadDebugInfo*, impala::Promise<long int>*), boost::_bi::list0>+0x5b
- bind.hpp:525
impalad!boost::_bi::bind_t<void, void (*)(std::string const&, std::string
const&, boost::function<void (void)>, impala::ThreadDebugInfo const*,
impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>,
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void
(void)>>, boost::_bi::value<impala::ThreadDebugInfo*>,
boost::_bi::value<impala::Promise<long>*>>>::operator()+0 - bind_template.hpp:20
impalad!boost::detail::thread_data<boost::_bi::bind_t<void, void
(*)(std::string const&, std::string const&, boost::function<void (void)>,
impala::ThreadDebugInfo const*, impala::Promise<long>*),
boost::_bi::list5<boost::_bi::value<std::string>,
boost::_bi::value<std::string>, boost::_bi::value<boost::function<void
(void)>>, boost::_bi::value<impala::ThreadDebugInfo*>,
boost::_bi::value<impala::Promise<long>*>>>>::run+0x1e - thread.hpp:116
impalad!thread_proxy+0xd9 - [Unknown]:[Unknown]
libpthread.so.0!start_thread+0xc1 - pthread_create.c:312
libc.so.6!__clone+0x6c - clone.S:111
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)