Hi Tim, 

Thanks for your reply! I have a further question. When given MT_DOP=0, why 
don't we use RuntimeState::is_cancelled() to detect cancellation in hdfs 
scanners? For example, use it in the loop of ProcessSplit.
There might be a scenario that the FragementInstance was canceled, but the 
scanner still don't know about it and then go ahead and pass up all the row 
batches. If the FragementInstance just consists of an HdfsScanNode, the 
DataStreamSender will try to send these row batches to the upstream 
FragmentInstance which has been cancelled. Apparently it'll fail but it will 
retry for 2 minutes (in default). The memory resources kept by the 
DataStreamSender cannot be released in this 2 minutes window, which might cause 
other queries in parallel raising MemLimitExceeded error. 
For example, the plan of query "select 1 from alltypessmall a join 
alltypessmall b on a.id != b.id" is 
+------------------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=0B                                  
     |
| Per-Host Resource Estimates: Memory=2.06GB                                    
     |
| WARNING: The following tables are missing relevant table and/or column 
statistics. |
| functional_orc.alltypessmall                                                  
     |
|                                                                               
     |
| F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1                         
     |
| Per-Host Resources: mem-estimate=0B mem-reservation=0B                        
     |
|   PLAN-ROOT SINK                                                              
     |
|   |  mem-estimate=0B mem-reservation=0B                                       
     |
|   |                                                                           
     |
|   04:EXCHANGE [UNPARTITIONED]                                                 
     |
|      mem-estimate=0B mem-reservation=0B                                       
     |
|      tuple-ids=0,1 row-size=8B cardinality=unavailable                        
     |
|                                                                               
     |
| F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                                
     |
| Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B                    
     |
|   DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED]                  
     |
|   |  mem-estimate=0B mem-reservation=0B                                       
     |
|   02:NESTED LOOP JOIN [INNER JOIN, BROADCAST]                                 
     |
|   |  predicates: a.id != b.id                                                 
     |
|   |  mem-estimate=2.00GB mem-reservation=0B                                   
     |
|   |  tuple-ids=0,1 row-size=8B cardinality=unavailable                        
     |
|   |                                                                           
     |
|   |--03:EXCHANGE [BROADCAST]                                                  
     |
|   |     mem-estimate=0B mem-reservation=0B                                    
     |
|   |     tuple-ids=1 row-size=4B cardinality=unavailable                       
     |
|   |                                                                           
     |
|   00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM]                       
     |
|      partitions=4/4 files=4 size=4.82KB                                       
     |
|      stored statistics:                                                       
     |
|        table: rows=unavailable size=unavailable                               
     |
|        partitions: 0/4 rows=unavailable                                       
     |
|        columns: unavailable                                                   
     |
|      extrapolated-rows=disabled                                               
     |
|      mem-estimate=32.00MB mem-reservation=0B                                  
     |
|      tuple-ids=0 row-size=4B cardinality=unavailable                          
     |
|                                                                               
     |
| F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                                
     |
| Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B                   
     |
|   DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST]                      
     |
|   |  mem-estimate=0B mem-reservation=0B                                       
     |
|   01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM]                       
     |
|      partitions=4/4 files=4 size=4.82KB                                       
     |
|      stored statistics:                                                       
     |
|        table: rows=unavailable size=unavailable                               
     |
|        partitions: 0/4 rows=unavailable                                       
     |
|        columns: unavailable                                                   
     |
|      extrapolated-rows=disabled                                               
     |
|      mem-estimate=32.00MB mem-reservation=0B                                  
     |
|      tuple-ids=1 row-size=4B cardinality=unavailable                          
     |
+------------------------------------------------------------------------------------+


When errors happen in F00, cancellation rpc will be sent to F01. However, the 
hdfs scanner in F01 does not notice it in time and pass up all the row batches. 
Then the DataStreamSender will try to send these row batches to F01. It will 
retry for 2 minutes. In this time window it might hold significant memory 
resources, which causes other queries cannot allocate memory and fail. This can 
be avoid if the hdfs scanner use RuntimeState::is_cancelled() to detect the 
cancellation in time.


Am I right? 


Thanks,
Quanlong


At 2018-01-17 01:05:57, "Tim Armstrong" <[email protected]> wrote:
>ScannerContext::cancelled() == true means that the scan has completed,
>either because it has returned enough rows, because the query is cancelled,
>or because it hit an error.
>
>RuntimeState::cancelled() == true means that the query is cancelled.
>
>So there are cases where ScannerContext::cancelled() == true and
>RuntimeState::cancelled() is false. E.g. where there's a limit on the scan.
>
>I think the name of ScannerContext::cancelled() is misleading, it should
>probably be called "done()" to match HdfsScanNode::done(). More generally,
>the cancellation logic could probably be cleaned up and simplified further.
>
>On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <[email protected]>
>wrote:
>
>> Hi all,
>>
>>
>> I'm confused about the cancellation logic in hdfs scanners. There're two
>> functions to detect cancellation: ScannerContext::cancelled() and
>> RuntimeState::is_cancelled().
>> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will
>> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode seems
>> to be set according to status return from scanners.
>> I've witnessed some points when RuntimeState::is_cancelled() is true but
>> ScannerContext::cancelled() is false.
>>
>>
>> My question is why scanners don't use RuntimeState::is_cancelled() to
>> detect cancellation, which is more timely than using
>> ScannerContext::cancelled(). There must be some detailed reasons that I've
>> missed. Would you be so kind to answer my question?
>>
>>
>> Thanks,
>> Quanlong

Reply via email to