[
https://issues.apache.org/jira/browse/IMPALA-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Ho resolved IMPALA-5168.
--------------------------------
Resolution: Fixed
[https://github.com/apache/impala/commit/51ff47d05e6edf80ca07805c68b417dec789d85b]
{noformat}
IMPALA-5168: Codegen HASH_PARTITIONED KrpcDataStreamSender::Send()
This change codegens the hash partitioning logic of
KrpcDataStreamSender::Send() when the partitioning strategy
is HASH_PARTITIONED. It does so by unrolling the loop which
evaluates each row against the partitioning expressions and
hashes the result. It also replaces the number of channels
of that sender with a constant at runtime.
With this change, we get reasonable speedup with some benchmarks:
+------------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) |
Delta(GeoMean) |
+------------+-----------------------+---------+------------+------------+----------------+
| TPCH(_300) | parquet / none / none | 20.03 | -6.44% | 13.56 |
-7.15% |
+------------+-----------------------+---------+------------+------------+----------------+
+---------------------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) |
GeoMean(s) | Delta(GeoMean) |
+---------------------+-----------------------+---------+------------+------------+----------------+
| TARGETED-PERF(_300) | parquet / none / none | 58.59 | -5.56% | 12.28
| -5.30% |
+---------------------+-----------------------+---------+------------+------------+----------------+
+-------------------------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) |
GeoMean(s) | Delta(GeoMean) |
+-------------------------+-----------------------+---------+------------+------------+----------------+
| TPCDS-UNMODIFIED(_1000) | parquet / none / none | 15.60 | -3.10% | 7.16
| -4.33% |
+-------------------------+-----------------------+---------+------------+------------+----------------+
+-------------------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s)
| Delta(GeoMean) |
+-------------------+-----------------------+---------+------------+------------+----------------+
| TPCH_NESTED(_300) | parquet / none / none | 30.93 | -3.02% | 17.46
| -4.71% |
+-------------------+-----------------------+---------+------------+------------+----------------+
Change-Id: I1c44cc9312c062cc7a5a4ac9156ceaa31fb887ff
Reviewed-on: http://gerrit.cloudera.org:8080/10421
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
{noformat}
> Codegen hash computation in DataStreamSender::Send for partition exchange.
> ---------------------------------------------------------------------------
>
> Key: IMPALA-5168
> URL: https://issues.apache.org/jira/browse/IMPALA-5168
> Project: IMPALA
> Issue Type: Improvement
> Components: Backend
> Affects Versions: Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0,
> Impala 2.10.0, Impala 2.11.0
> Reporter: Mostafa Mokhtar
> Assignee: Michael Ho
> Priority: Major
> Labels: perfomance
>
> Hash partition computation for exchange operators can benefit from codegen,
> profile data ~20% of CPU in the fragment thread is consumed by
> RawValue::GetHashValueFnv & ExprContext::GetValue
> {code}
> // hash-partition batch's rows across channels
> int num_channels = channels_.size();
> for (int i = 0; i < batch->num_rows(); ++i) {
> TupleRow* row = batch->GetRow(i);
> uint32_t hash_val = HashUtil::FNV_SEED;
> for (int i = 0; i < partition_expr_ctxs_.size(); ++i) {
> ExprContext* ctx = partition_expr_ctxs_[i];
> void* partition_val = ctx->GetValue(row);
> // We can't use the crc hash function here because it does not result
> // in uncorrelated hashes with different seeds. Instead we must use
> // fnv hash.
> // TODO: fix crc hash/GetHashValue()
> hash_val =
> RawValue::GetHashValueFnv(partition_val, ctx->root()->type(),
> hash_val);
> }
> ExprContext::FreeLocalAllocations(partition_expr_ctxs_);
> RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
> }
> {code}
> |Function Stack| Effective Time % |
> |Total|100%|
> | clone|99%|
> | start_thread|99%|
> | thread_proxy|99%|
> | boost::detail::thread_data<boost::_bi::bind_t<>::run|99%|
> | boost::_bi::bind_t<void, void (*)(), ::operator()|99%|
> | operator()<void (*)(const std::basic_string<|99%|
> | impala::Thread::SuperviseThread|99%|
> | boost::function0<void>::operator()|99%|
> | impala::QueryExecMgr::ExecFInstance|99%|
> | impala::FragmentInstanceState::Exec|99%|
> | impala::PlanFragmentExecutor::Exec|99%|
> | impala::PlanFragmentExecutor::ExecInternal|96%|
> | impala::DataStreamSender::Send|91%|
> | impala::DataStreamSender::Channel::AddRow|56%|
> | impala::RawValue::GetHashValueFnv|11%|
> | impala::ExprContext::GetValue|11%|
> | impala::ExprContext::FreeLocalAllocations|6%|
> | impala::RowBatch::GetRow|1%|
> | std::vector<impala::ExprContext*,
> std::allocator<impala::ExprContext*>>::size|1%|
> | impala::Expr::type|0%|
> | impala::ExprContext::GetValue|0%|
> | impala::RuntimeState::CheckQueryState|0%|
> | impala::HdfsScanNode::GetNext|3%|
> | impala::RowBatch::Reset|1%|
> | Status|0%|
> | ~ScopedTimer|0%|
> | [Unknown stack frame(s)]|4%|
> Query used in repro
> {code}
> select /* +straight_join */ count(*)
> from store_sales a join /* +shuffle */
> store_returns b on
> a.ss_item_sk = b.sr_item_sk
> where a.ss_ticket_number = b.sr_ticket_number and ss_sold_date_sk between
> 2450816 and 2451500 and sr_returned_date_sk between 2450816 and 2451500
> group by a.ss_ticket_number
> having count(*) > 9999999999
> {code}
> Explain plan
> {code}
> +------------------------------------------------------------------------------------------+
> | Explain String
> |
> +------------------------------------------------------------------------------------------+
> | Estimated Per-Host Requirements: Memory=3.43GB VCores=3
> |
> |
> |
> | PLAN-ROOT SINK
> |
> | |
> |
> | 08:EXCHANGE [UNPARTITIONED]
> |
> | |
> |
> | 07:AGGREGATE [FINALIZE]
> |
> | | output: count:merge(*)
> |
> | | group by: a.ss_ticket_number
> |
> | | having: count(*) > 9999999999
> |
> | |
> |
> | 06:EXCHANGE [HASH(a.ss_ticket_number)]
> |
> | |
> |
> | 03:AGGREGATE [STREAMING]
> |
> | | output: count(*)
> |
> | | group by: a.ss_ticket_number
> |
> | |
> |
> | 02:HASH JOIN [INNER JOIN, PARTITIONED]
> |
> | | hash predicates: a.ss_item_sk = b.sr_item_sk, a.ss_ticket_number =
> b.sr_ticket_number |
> | | runtime filters: RF000 <- b.sr_item_sk, RF001 <- b.sr_ticket_number
> |
> | |
> |
> | |--05:EXCHANGE [HASH(b.sr_item_sk,b.sr_ticket_number)]
> |
> | | |
> |
> | | 01:SCAN HDFS [tpcds_3000_parquet.store_returns b]
> |
> | | partitions=681/2004 files=681 size=13.73GB
> |
> | |
> |
> | 04:EXCHANGE [HASH(a.ss_item_sk,a.ss_ticket_number)]
> |
> | |
> |
> | 00:SCAN HDFS [tpcds_3000_parquet.store_sales a]
> |
> | partitions=683/1824 files=944 size=140.19GB
> |
> | runtime filters: RF000 -> a.ss_item_sk, RF001 -> a.ss_ticket_number
> |
> +------------------------------------------------------------------------------------------+
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)