[ 
https://issues.apache.org/jira/browse/IMPALA-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720709#comment-16720709
 ] 

Michael Ho commented on IMPALA-6692:
------------------------------------

{quote}
Have we seen problems with any operators except for sort? The agg and join 
should be better behaved in that they don't do such large chunks of work 
in-between pulling batches from their children. There are probably some tweaks 
we could do to improve those further.
{quote}

Just attached [^profile-spilling.txt] as an example of the same behavior due to 
spilling. In particular, multiple agg running simultaneously on 
{{philip-bigg-12.vpc.cloudera.com}} were spilling heavily and that indirectly 
slowed down other fragments down.

{quote}
So even though other operators like join/groupby try to be more incremental in 
work, I'd be surprised if they don't have 99th percentile latencies in the 100s 
of milliseconds just due to process-level or OS-level stuff, at which point 
most of your servers will be 90% idle with no apparent bottleneck.
{quote}

I agree. That's the nature of the distributed execution. It's as fast as the 
slowest node. In [^profile-spilling.txt], a single instance of slow agg 
operator slows down all other instances of the fragments which feed into the 
agg operators. That's bad as resources (e.g. memory) are held up unnecessarily 
by the stalled fragments which may eventually make the problem worse.

As discussed in previous comments, the general fix proposed is to add more 
buffering in either the sender or receiver side. We already have some limited 
buffering (10MB) at the receiver side. It seems natural to extend the Exchange 
operator to support spilling. which enables more aggressive buffering at the 
receiver side. Doing so will also make the code simpler as we can remove the 
logic for deferred RPC replies from {{KrpcDataStreamRecvr}}. On the other hand, 
adding spilling to the Exchange operator may make the slow nodes even slower by 
piling more IO on it.

An alternate is to add spilling to the sender side but if we may have to do so 
per-channel, it's unclear how it will affect the minimum memory reservation for 
queries in large cluster deployment. [~tarmstrong], any idea ?

> When partition exchange is followed by sort each sort node becomes a 
> synchronization point across the cluster
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-6692
>                 URL: https://issues.apache.org/jira/browse/IMPALA-6692
>             Project: IMPALA
>          Issue Type: Sub-task
>          Components: Backend, Distributed Exec
>    Affects Versions: Impala 2.10.0
>            Reporter: Mostafa Mokhtar
>            Priority: Critical
>              Labels: perf, resource-management
>         Attachments: Kudu table insert without KRPC no sort.txt, Kudu table 
> insert without KRPC.txt, kudu_partial_sort_insert_vd1129.foo.com_2.txt, 
> profile-spilling.txt
>
>
> Issue described in this JIRA applies to 
> * Analytical functions
> * Writes to Partitioned Parquet tables
> * Writes to Kudu tables
> When inserting into a Kudu table from Impala the plan is something like HDFS 
> SCAN -> Partition Exchange -> Partial Sort -> Kudu Insert.
> The query initially makes good progress then significantly slows down and 
> very few nodes make progress.
> While the insert is running the query goes through different phases 
> * Phase 1
> ** Scan is reading data fast, sending data through to exchange 
> ** Partial Sort keeps accumulating batches
> ** Network and CPU is busy, life appears to be OK
> * Phase 2
> ** One of the Sort operators reaches its memory limit and stops calling 
> ExchangeNode::GetNext for a while
> ** This creates back pressure against the DataStreamSenders
> ** The Partial Sort doesn't call GetNext until it has finished sorting GBs of 
> data (Partial sort memory is unbounded as of 03/16/2018)
> ** All exchange operators in the cluster eventually get blocked on that Sort 
> operator and can no longer make progress
> ** After a while the Sort is able to accept more batches which temporarily 
> unblocks execution across the cluster
> ** Another sort operator reaches its memory limit and this loop repeats itself
> Below are stacks from one of the blocked hosts
> _Sort node waiting on data from exchange node as it didn't start sorting 
> since the memory limit for the sort wasn't reached_
> {code}
> Thread 90 (Thread 0x7f8d7d233700 (LWP 21625)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from 
> /lib64/libpthread.so.0
> #1  0x00007fab1422174c in 
> std::condition_variable::wait(std::unique_lock<std::mutex>&) () from 
> /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.205/lib/impala/lib/libstdc++.so.6
> #2  0x0000000000b4d5aa in void 
> std::_V2::condition_variable_any::wait<boost::unique_lock<impala::SpinLock> 
> >(boost::unique_lock<impala::SpinLock>&) ()
> #3  0x0000000000b4ab6a in 
> impala::KrpcDataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
> #4  0x0000000000b4b0c8 in 
> impala::KrpcDataStreamRecvr::GetBatch(impala::RowBatch**) ()
> #5  0x0000000000dca7c5 in 
> impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
> #6  0x0000000000dcacae in 
> impala::ExchangeNode::GetNext(impala::RuntimeState*, impala::RowBatch*, 
> bool*) ()
> #7  0x0000000001032ac3 in 
> impala::PartialSortNode::GetNext(impala::RuntimeState*, impala::RowBatch*, 
> bool*) ()
> #8  0x0000000000ba9c92 in impala::FragmentInstanceState::ExecInternal() ()
> #9  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> #10 0x0000000000b9ab1a in 
> impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
> #11 0x0000000000d5da9f in 
> impala::Thread::SuperviseThread(std::basic_string<char, 
> std::char_traits<char>, std::allocator<char> > const&, 
> std::basic_string<char, std::char_traits<char>, std::allocator<char> > 
> const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, 
> impala::Promise<long>*) ()
> #12 0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, 
> void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> 
> > const&, std::basic_string<char, std::char_traits<char>, 
> std::allocator<char> > const&, boost::function<void ()()>, 
> impala::ThreadDebugInfo const*, impala::Promise<long>*), 
> boost::_bi::list5<boost::_bi::value<std::basic_string<char, 
> std::char_traits<char>, std::allocator<char> > >, 
> boost::_bi::value<std::basic_string<char, std::char_traits<char>, 
> std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, 
> boost::_bi::value<impala::ThreadDebugInfo*>, 
> boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #13 0x00000000012d70ba in thread_proxy ()
> #14 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #15 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}
> _DataStreamSender blocked due to back pressure from the DataStreamRecvr on 
> the node which has a Sort that is spilling_
> {code}
> Thread 89 (Thread 0x7fa8f6a15700 (LWP 21626)):
> #0  0x0000003a6f00ba5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from 
> /lib64/libpthread.so.0
> #1  0x0000000001237e77 in 
> impala::KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<impala::SpinLock>*)
>  ()
> #2  0x0000000001238b8d in 
> impala::KrpcDataStreamSender::Channel::TransmitData(impala::OutboundRowBatch 
> const*) ()
> #3  0x0000000001238ca9 in 
> impala::KrpcDataStreamSender::Channel::SerializeAndSendBatch(impala::RowBatch*)
>  ()
> #4  0x0000000001238d2e in 
> impala::KrpcDataStreamSender::Channel::SendCurrentBatch() ()
> #5  0x000000000123949f in 
> impala::KrpcDataStreamSender::Send(impala::RuntimeState*, impala::RowBatch*) 
> ()
> #6  0x0000000000ba9d47 in impala::FragmentInstanceState::ExecInternal() ()
> #7  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> {code}
> _Scan node blocked due to back pressure from the DataStreamSender_
> {code}
> Thread 68 (Thread 0x7fa929667700 (LWP 21648)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from 
> /lib64/libpthread.so.0
> #1  0x0000000000dc9c60 in bool 
> impala::BlockingQueue<std::unique_ptr<impala::RowBatch, 
> std::default_delete<impala::RowBatch> > 
> >::BlockingPut<std::unique_ptr<impala::RowBatch, 
> std::default_delete<impala::RowBatch> > >(std::unique_ptr<impala::RowBatch, 
> std::default_delete<impala::RowBatch> >&&) ()
> #2  0x0000000000dc61e6 in 
> impala::ExecNode::RowBatchQueue::AddBatch(std::unique_ptr<impala::RowBatch, 
> std::default_delete<impala::RowBatch> >) ()
> #3  0x0000000000dd1ca8 in 
> impala::HdfsScanNode::AddMaterializedRowBatch(std::unique_ptr<impala::RowBatch,
>  std::default_delete<impala::RowBatch> >) ()
> #4  0x0000000000e08adb in impala::HdfsParquetScanner::ProcessSplit() ()
> #5  0x0000000000dd219d in 
> impala::HdfsScanNode::ProcessSplit(std::vector<impala::FilterContext, 
> std::allocator<impala::FilterContext> > const&, impala::MemPool*, 
> impala::io::ScanRange*) ()
> #6  0x0000000000dd3a12 in impala::HdfsScanNode::ScannerThread() ()
> #7  0x0000000000d5da9f in 
> impala::Thread::SuperviseThread(std::basic_string<char, 
> std::char_traits<char>, std::allocator<char> > const&, 
> std::basic_string<char, std::char_traits<char>, std::allocator<char> > 
> const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, 
> impala::Promise<long>*) ()
> #8  0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, 
> void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> 
> > const&, std::basic_string<char, std::char_traits<char>, 
> std::allocator<char> > const&, boost::function<void ()()>, 
> impala::ThreadDebugInfo const*, impala::Promise<long>*), 
> boost::_bi::list5<boost::_bi::value<std::basic_string<char, 
> std::char_traits<char>, std::allocator<char> > >, 
> boost::_bi::value<std::basic_string<char, std::char_traits<char>, 
> std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, 
> boost::_bi::value<impala::ThreadDebugInfo*>, 
> boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #9  0x00000000012d70ba in thread_proxy ()
> #10 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #11 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to