I'm developing the hdfs orc scanner (IMPALA-5717) and encountered such scenario 
in test_failpoints.py. The existing scanners can pass this test. I think this 
might be my own problem so I haven't filed a JIRA yet.


Just want to confirm that when setting MT_DOP=0, other scanners won't get into 
this scenario. For non-MT scanners, why don't they just check about 
RuntimeState::is_cancelled()? Are there any reasons that they should go ahead 
until HdfsScanNode::done()?


At 2018-01-17 07:00:51, "Tim Armstrong" <[email protected]> wrote:

Looks to me like you found a bug. I think the scanners should be checking both 
cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and non-MT 
scanners and hdfs_scan_node::done_ for non-MT scanners.



On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <[email protected]> wrote:

Hi Tim, 


Thanks for your reply! I have a further question. When given MT_DOP=0, why 
don't we use RuntimeState::is_cancelled() to detect cancellation in hdfs 
scanners? For example, use it in the loop of ProcessSplit.
There might be a scenario that the FragementInstance was canceled, but the 
scanner still don't know about it and then go ahead and pass up all the row 
batches. If the FragementInstance just consists of an HdfsScanNode, the 
DataStreamSender will try to send these row batches to the upstream 
FragmentInstance which has been cancelled. Apparently it'll fail but it will 
retry for 2 minutes (in default). The memory resources kept by the 
DataStreamSender cannot be released in this 2 minutes window, which might cause 
other queries in parallel raising MemLimitExceeded error. 
For example, the plan of query "select 1 from alltypessmall a join 
alltypessmall b on a.id != b.id" is 
+------------------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=0B                                  
     |
| Per-Host Resource Estimates: Memory=2.06GB                                    
     |
| WARNING: The following tables are missing relevant table and/or column 
statistics. |
| functional_orc.alltypessmall                                                  
     |
|                                                                               
     |
| F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1                         
     |
| Per-Host Resources: mem-estimate=0B mem-reservation=0B                        
     |
|   PLAN-ROOT SINK                                                              
     |
|   |  mem-estimate=0B mem-reservation=0B                                       
     |
|   |                                                                           
     |
|   04:EXCHANGE [UNPARTITIONED]                                                 
     |
|      mem-estimate=0B mem-reservation=0B                                       
     |
|      tuple-ids=0,1 row-size=8B cardinality=unavailable                        
     |
|                                                                               
     |
| F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                                
     |
| Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B                    
     |
|   DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED]                  
     |
|   |  mem-estimate=0B mem-reservation=0B                                       
     |
|   02:NESTED LOOP JOIN [INNER JOIN, BROADCAST]                                 
     |
|   |  predicates: a.id != b.id                                                 
     |
|   |  mem-estimate=2.00GB mem-reservation=0B                                   
     |
|   |  tuple-ids=0,1 row-size=8B cardinality=unavailable                        
     |
|   |                                                                           
     |
|   |--03:EXCHANGE [BROADCAST]                                                  
     |
|   |     mem-estimate=0B mem-reservation=0B                                    
     |
|   |     tuple-ids=1 row-size=4B cardinality=unavailable                       
     |
|   |                                                                           
     |
|   00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM]                       
     |
|      partitions=4/4 files=4 size=4.82KB                                       
     |
|      stored statistics:                                                       
     |
|        table: rows=unavailable size=unavailable                               
     |
|        partitions: 0/4 rows=unavailable                                       
     |
|        columns: unavailable                                                   
     |
|      extrapolated-rows=disabled                                               
     |
|      mem-estimate=32.00MB mem-reservation=0B                                  
     |
|      tuple-ids=0 row-size=4B cardinality=unavailable                          
     |
|                                                                               
     |
| F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                                
     |
| Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B                   
     |
|   DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST]                      
     |
|   |  mem-estimate=0B mem-reservation=0B                                       
     |
|   01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM]                       
     |
|      partitions=4/4 files=4 size=4.82KB                                       
     |
|      stored statistics:                                                       
     |
|        table: rows=unavailable size=unavailable                               
     |
|        partitions: 0/4 rows=unavailable                                       
     |
|        columns: unavailable                                                   
     |
|      extrapolated-rows=disabled                                               
     |
|      mem-estimate=32.00MB mem-reservation=0B                                  
     |
|      tuple-ids=1 row-size=4B cardinality=unavailable                          
     |
+------------------------------------------------------------------------------------+


When errors happen in F00, cancellation rpc will be sent to F01. However, the 
hdfs scanner in F01 does not notice it in time and pass up all the row batches. 
Then the DataStreamSender will try to send these row batches to F01. It will 
retry for 2 minutes. In this time window it might hold significant memory 
resources, which causes other queries cannot allocate memory and fail. This can 
be avoid if the hdfs scanner use RuntimeState::is_cancelled() to detect the 
cancellation in time.


Am I right? 


Thanks,
Quanlong


At 2018-01-17 01:05:57, "Tim Armstrong" <[email protected]> wrote:
>ScannerContext::cancelled() == true means that the scan has completed,
>either because it has returned enough rows, because the query is cancelled,
>or because it hit an error.
>
>RuntimeState::cancelled() == true means that the query is cancelled.
>
>So there are cases where ScannerContext::cancelled() == true and
>RuntimeState::cancelled() is false. E.g. where there's a limit on the scan.
>
>I think the name of ScannerContext::cancelled() is misleading, it should
>probably be called "done()" to match HdfsScanNode::done(). More generally,
>the cancellation logic could probably be cleaned up and simplified further.
>
>On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <[email protected]>
>wrote:
>
>> Hi all,
>>
>>
>> I'm confused about the cancellation logic in hdfs scanners. There're two
>> functions to detect cancellation: ScannerContext::cancelled() and
>> RuntimeState::is_cancelled().
>> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will
>> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode seems
>> to be set according to status return from scanners.
>> I've witnessed some points when RuntimeState::is_cancelled() is true but
>> ScannerContext::cancelled() is false.
>>
>>
>> My question is why scanners don't use RuntimeState::is_cancelled() to
>> detect cancellation, which is more timely than using
>> ScannerContext::cancelled(). There must be some detailed reasons that I've
>> missed. Would you be so kind to answer my question?
>>
>>
>> Thanks,
>> Quanlong





 


Reply via email to