Thanks, Tim! Let me try to reproduce this scenario on existing scanners. I'll file a JIRA when I find it.
At 2018-01-17 08:39:46, "Tim Armstrong" <[email protected]> wrote: >I think there is still probably a bug in the existing scanners where they >can ignore cancellation under specific conditions. > >> For non-MT scanners, why don't they just check about >> RuntimeState::is_cancelled()? >Are there any reasons that they should go ahead until HdfsScanNode::done()? >I think the non-MT scanners should check both RuntimeState::is_cancelled() >and HdfsScanNode::done(), since they signal different termination >conditions. > >On Tue, Jan 16, 2018 at 4:09 PM, Quanlong Huang <[email protected]> >wrote: > >> I'm developing the hdfs orc scanner (IMPALA-5717) and encountered such >> scenario in test_failpoints.py. The existing scanners can pass this test. I >> think this might be my own problem so I haven't filed a JIRA yet. >> >> Just want to confirm that when setting MT_DOP=0, other scanners won't get >> into this scenario. For non-MT scanners, why don't they just check >> about RuntimeState::is_cancelled()? Are there any reasons that they >> should go ahead until HdfsScanNode::done()? >> >> At 2018-01-17 07:00:51, "Tim Armstrong" <[email protected]> wrote: >> >> Looks to me like you found a bug. I think the scanners should be checking >> both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and >> non-MT scanners and hdfs_scan_node::done_ for non-MT scanners. >> >> On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <[email protected]> >> wrote: >> >>> Hi Tim, >>> >>> Thanks for your reply! I have a further question. When given MT_DOP=0, >>> why don't we use RuntimeState::is_cancelled() to detect cancellation in >>> hdfs scanners? For example, use it in the loop of ProcessSplit. >>> There might be a scenario that the FragementInstance was canceled, but >>> the scanner still don't know about it and then go ahead and pass up all the >>> row batches. If the FragementInstance just consists of an HdfsScanNode, the >>> DataStreamSender will try to send these row batches to the upstream >>> FragmentInstance which has been cancelled. Apparently it'll fail but it >>> will retry for 2 minutes (in default). The memory resources kept by the >>> DataStreamSender cannot be released in this 2 minutes window, which might >>> cause other queries in parallel raising MemLimitExceeded error. >>> >>> For example, the plan of query "select 1 from alltypessmall a join >>> alltypessmall b on a.id != b.id" is >>> +------------------------------------------------------------------------------------+ >>> | Max Per-Host Resource Reservation: Memory=0B >>> | >>> | Per-Host Resource Estimates: Memory=2.06GB >>> | >>> | WARNING: The following tables are missing relevant table and/or column >>> statistics. | >>> | functional_orc.alltypessmall >>> | >>> | >>> | >>> | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 >>> | >>> | Per-Host Resources: mem-estimate=0B mem-reservation=0B >>> | >>> | PLAN-ROOT SINK >>> | >>> | | mem-estimate=0B mem-reservation=0B >>> | >>> | | >>> | >>> | 04:EXCHANGE [UNPARTITIONED] >>> | >>> | mem-estimate=0B mem-reservation=0B >>> | >>> | tuple-ids=0,1 row-size=8B cardinality=unavailable >>> | >>> | >>> | >>> | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 >>> | >>> | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B >>> | >>> | DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED] >>> | >>> | | mem-estimate=0B mem-reservation=0B >>> | >>> | 02:NESTED LOOP JOIN [INNER JOIN, BROADCAST] >>> | >>> | | predicates: a.id != b.id >>> | >>> | | mem-estimate=2.00GB mem-reservation=0B >>> | >>> | | tuple-ids=0,1 row-size=8B cardinality=unavailable >>> | >>> | | >>> | >>> | |--03:EXCHANGE [BROADCAST] >>> | >>> | | mem-estimate=0B mem-reservation=0B >>> | >>> | | tuple-ids=1 row-size=4B cardinality=unavailable >>> | >>> | | >>> | >>> | 00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM] >>> | >>> | partitions=4/4 files=4 size=4.82KB >>> | >>> | stored statistics: >>> | >>> | table: rows=unavailable size=unavailable >>> | >>> | partitions: 0/4 rows=unavailable >>> | >>> | columns: unavailable >>> | >>> | extrapolated-rows=disabled >>> | >>> | mem-estimate=32.00MB mem-reservation=0B >>> | >>> | tuple-ids=0 row-size=4B cardinality=unavailable >>> | >>> | >>> | >>> | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 >>> | >>> | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B >>> | >>> | DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST] >>> | >>> | | mem-estimate=0B mem-reservation=0B >>> | >>> | 01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM] >>> | >>> | partitions=4/4 files=4 size=4.82KB >>> | >>> | stored statistics: >>> | >>> | table: rows=unavailable size=unavailable >>> | >>> | partitions: 0/4 rows=unavailable >>> | >>> | columns: unavailable >>> | >>> | extrapolated-rows=disabled >>> | >>> | mem-estimate=32.00MB mem-reservation=0B >>> | >>> | tuple-ids=1 row-size=4B cardinality=unavailable >>> | >>> +------------------------------------------------------------------------------------+ >>> >>> When errors happen in F00, cancellation rpc will be sent to F01. However, >>> the hdfs scanner in F01 does not notice it in time and pass up all the row >>> batches. Then the DataStreamSender will try to send these row batches to >>> F01. It will retry for 2 minutes. In this time window it might hold >>> significant memory resources, which causes other queries cannot allocate >>> memory and fail. This can be avoid if the hdfs scanner use >>> RuntimeState::is_cancelled() to detect the cancellation in time. >>> >>> Am I right? >>> >>> Thanks, >>> Quanlong >>> >>> At 2018-01-17 01:05:57, "Tim Armstrong" <[email protected]> wrote: >>> >ScannerContext::cancelled() == true means that the scan has completed, >>> >either because it has returned enough rows, because the query is cancelled, >>> >or because it hit an error. >>> > >>> >RuntimeState::cancelled() == true means that the query is cancelled. >>> > >>> >So there are cases where ScannerContext::cancelled() == true and >>> >RuntimeState::cancelled() is false. E.g. where there's a limit on the scan. >>> > >>> >I think the name of ScannerContext::cancelled() is misleading, it should >>> >probably be called "done()" to match HdfsScanNode::done(). More generally, >>> >the cancellation logic could probably be cleaned up and simplified further. >>> > >>> >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <[email protected]> >>> >wrote: >>> > >>> >> Hi all, >>> >> >>> >> >>> >> I'm confused about the cancellation logic in hdfs scanners. There're two >>> >> functions to detect cancellation: ScannerContext::cancelled() and >>> >> RuntimeState::is_cancelled(). >>> >> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will >>> >> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode >>> >> seems >>> >> to be set according to status return from scanners. >>> >> I've witnessed some points when RuntimeState::is_cancelled() is true but >>> >> ScannerContext::cancelled() is false. >>> >> >>> >> >>> >> My question is why scanners don't use RuntimeState::is_cancelled() to >>> >> detect cancellation, which is more timely than using >>> >> ScannerContext::cancelled(). There must be some detailed reasons that >>> >> I've >>> >> missed. Would you be so kind to answer my question? >>> >> >>> >> >>> >> Thanks, >>> >> Quanlong >>> >>> >>> >>> >>> >> >> >> >> >>
