Very nice! Thank you. On Fri, Jan 19, 2018 at 6:16 AM, Quanlong Huang <[email protected]> wrote:
> Hi Tim, I believe it's a bug and find out a way to reproduce it. > Have filed a JIRA: https://issues.apache.org/jira/browse/IMPALA-6423 > > At 2018-01-17 08:53:44, "Quanlong Huang" <[email protected]> wrote: > >Thanks, Tim! Let me try to reproduce this scenario on existing scanners. > I'll file a JIRA when I find it. > > > >At 2018-01-17 08:39:46, "Tim Armstrong" <[email protected]> wrote: > >>I think there is still probably a bug in the existing scanners where they > >>can ignore cancellation under specific conditions. > >> > >>> For non-MT scanners, why don't they just check about > RuntimeState::is_cancelled()? > >>Are there any reasons that they should go ahead until > HdfsScanNode::done()? > >>I think the non-MT scanners should check both > RuntimeState::is_cancelled() > >>and HdfsScanNode::done(), since they signal different termination > >>conditions. > >> > >>On Tue, Jan 16, 2018 at 4:09 PM, Quanlong Huang <[email protected]> > >>wrote: > >> > >>> I'm developing the hdfs orc scanner (IMPALA-5717) and encountered such > >>> scenario in test_failpoints.py. The existing scanners can pass this > test. I > >>> think this might be my own problem so I haven't filed a JIRA yet. > >>> > >>> Just want to confirm that when setting MT_DOP=0, other scanners won't > get > >>> into this scenario. For non-MT scanners, why don't they just check > >>> about RuntimeState::is_cancelled()? Are there any reasons that they > >>> should go ahead until HdfsScanNode::done()? > >>> > >>> At 2018-01-17 07:00:51, "Tim Armstrong" <[email protected]> > wrote: > >>> > >>> Looks to me like you found a bug. I think the scanners should be > checking > >>> both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT > and > >>> non-MT scanners and hdfs_scan_node::done_ for non-MT scanners. > >>> > >>> On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang < > [email protected]> > >>> wrote: > >>> > >>>> Hi Tim, > >>>> > >>>> Thanks for your reply! I have a further question. When given MT_DOP=0, > >>>> why don't we use RuntimeState::is_cancelled() to detect cancellation > in > >>>> hdfs scanners? For example, use it in the loop of ProcessSplit. > >>>> There might be a scenario that the FragementInstance was canceled, but > >>>> the scanner still don't know about it and then go ahead and pass up > all the > >>>> row batches. If the FragementInstance just consists of an > HdfsScanNode, the > >>>> DataStreamSender will try to send these row batches to the upstream > >>>> FragmentInstance which has been cancelled. Apparently it'll fail but > it > >>>> will retry for 2 minutes (in default). The memory resources kept by > the > >>>> DataStreamSender cannot be released in this 2 minutes window, which > might > >>>> cause other queries in parallel raising MemLimitExceeded error. > >>>> > >>>> For example, the plan of query "select 1 from alltypessmall a join > alltypessmall b on a.id != b.id" is > >>>> +----------------------------------------------------------- > -------------------------+ > >>>> | Max Per-Host Resource Reservation: Memory=0B > | > >>>> | Per-Host Resource Estimates: Memory=2.06GB > | > >>>> | WARNING: The following tables are missing relevant table and/or > column statistics. | > >>>> | functional_orc.alltypessmall > | > >>>> | > | > >>>> | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 > | > >>>> | Per-Host Resources: mem-estimate=0B mem-reservation=0B > | > >>>> | PLAN-ROOT SINK > | > >>>> | | mem-estimate=0B mem-reservation=0B > | > >>>> | | > | > >>>> | 04:EXCHANGE [UNPARTITIONED] > | > >>>> | mem-estimate=0B mem-reservation=0B > | > >>>> | tuple-ids=0,1 row-size=8B cardinality=unavailable > | > >>>> | > | > >>>> | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 > | > >>>> | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B > | > >>>> | DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED] > | > >>>> | | mem-estimate=0B mem-reservation=0B > | > >>>> | 02:NESTED LOOP JOIN [INNER JOIN, BROADCAST] > | > >>>> | | predicates: a.id != b.id > | > >>>> | | mem-estimate=2.00GB mem-reservation=0B > | > >>>> | | tuple-ids=0,1 row-size=8B cardinality=unavailable > | > >>>> | | > | > >>>> | |--03:EXCHANGE [BROADCAST] > | > >>>> | | mem-estimate=0B mem-reservation=0B > | > >>>> | | tuple-ids=1 row-size=4B cardinality=unavailable > | > >>>> | | > | > >>>> | 00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM] > | > >>>> | partitions=4/4 files=4 size=4.82KB > | > >>>> | stored statistics: > | > >>>> | table: rows=unavailable size=unavailable > | > >>>> | partitions: 0/4 rows=unavailable > | > >>>> | columns: unavailable > | > >>>> | extrapolated-rows=disabled > | > >>>> | mem-estimate=32.00MB mem-reservation=0B > | > >>>> | tuple-ids=0 row-size=4B cardinality=unavailable > | > >>>> | > | > >>>> | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 > | > >>>> | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B > | > >>>> | DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST] > | > >>>> | | mem-estimate=0B mem-reservation=0B > | > >>>> | 01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM] > | > >>>> | partitions=4/4 files=4 size=4.82KB > | > >>>> | stored statistics: > | > >>>> | table: rows=unavailable size=unavailable > | > >>>> | partitions: 0/4 rows=unavailable > | > >>>> | columns: unavailable > | > >>>> | extrapolated-rows=disabled > | > >>>> | mem-estimate=32.00MB mem-reservation=0B > | > >>>> | tuple-ids=1 row-size=4B cardinality=unavailable > | > >>>> +----------------------------------------------------------- > -------------------------+ > >>>> > >>>> When errors happen in F00, cancellation rpc will be sent to F01. > However, the hdfs scanner in F01 does not notice it in time and pass up all > the row batches. Then the DataStreamSender will try to send these row > batches to F01. It will retry for 2 minutes. In this time window it might > hold significant memory resources, which causes other queries cannot > allocate memory and fail. This can be avoid if the hdfs scanner use > RuntimeState::is_cancelled() to detect the cancellation in time. > >>>> > >>>> Am I right? > >>>> > >>>> Thanks, > >>>> Quanlong > >>>> > >>>> At 2018-01-17 01:05:57, "Tim Armstrong" <[email protected]> > wrote: > >>>> >ScannerContext::cancelled() == true means that the scan has > completed, > >>>> >either because it has returned enough rows, because the query is > cancelled, > >>>> >or because it hit an error. > >>>> > > >>>> >RuntimeState::cancelled() == true means that the query is cancelled. > >>>> > > >>>> >So there are cases where ScannerContext::cancelled() == true and > >>>> >RuntimeState::cancelled() is false. E.g. where there's a limit on > the scan. > >>>> > > >>>> >I think the name of ScannerContext::cancelled() is misleading, it > should > >>>> >probably be called "done()" to match HdfsScanNode::done(). More > generally, > >>>> >the cancellation logic could probably be cleaned up and simplified > further. > >>>> > > >>>> >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang < > [email protected]> > >>>> >wrote: > >>>> > > >>>> >> Hi all, > >>>> >> > >>>> >> > >>>> >> I'm confused about the cancellation logic in hdfs scanners. > There're two > >>>> >> functions to detect cancellation: ScannerContext::cancelled() and > >>>> >> RuntimeState::is_cancelled(). > >>>> >> When MT_DOP is not set (i.e. MT_DOP=0), > ScannerContext::cancelled() will > >>>> >> return HdfsScanNode::done(). However, the field done_ in > HdfsScanNode seems > >>>> >> to be set according to status return from scanners. > >>>> >> I've witnessed some points when RuntimeState::is_cancelled() is > true but > >>>> >> ScannerContext::cancelled() is false. > >>>> >> > >>>> >> > >>>> >> My question is why scanners don't use RuntimeState::is_cancelled() > to > >>>> >> detect cancellation, which is more timely than using > >>>> >> ScannerContext::cancelled(). There must be some detailed reasons > that I've > >>>> >> missed. Would you be so kind to answer my question? > >>>> >> > >>>> >> > >>>> >> Thanks, > >>>> >> Quanlong > >>>> > >>>> > >>>> > >>>> > >>>> > >>> > >>> > >>> > >>> > >>> >
