[ 
https://issues.apache.org/jira/browse/DRILL-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411283#comment-15411283
 ] 

Aman Sinha commented on DRILL-4833:
-----------------------------------

I believe the root cause of the issue is that in the excessive exchange 
identification phase, we don't do any special handling for union-all, so the 
default implementation treats it similar to any other multi-input operators 
such as join. 

Ideally, a union-all should be treated differently compared to a join because 
joins impose a co-location requirement and therefore insert an exchange on both 
sides of the join (e.g HashToRandomExchange or BroadcastExchange), thus the 
major fragment of the join itself is different from the major fragment of its 
children.  If a LIMIT 1 subquery occurs on one side of the join, its major 
fragment parallelism does not affect the parent fragment or the sibling 
fragment.  

Union-All does not impose the co-location requirement on its children, hence 
the major fragment of the union-all may be the same as that of its children.  
Thus, we should take an 'aggregate' view of all its children to decide the 
parallelism.   My proposed fix is based on this reasoning. 

> Union-All with a LIMIT 1 on one side does not get parallelized
> --------------------------------------------------------------
>
>                 Key: DRILL-4833
>                 URL: https://issues.apache.org/jira/browse/DRILL-4833
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.7.0
>            Reporter: Aman Sinha
>            Assignee: Aman Sinha
>
> When a Union-All has an input that is a LIMIT 1 (or some small value relative 
> to the slice_target), and that input is accessing Parquet files, Drill does 
> an optimization where a single Parquet file is read (based on the rowcount 
> statistics in the Parquet file, we determine that reading 1 file is 
> sufficient).  This also means that the max width for that major fragment is 
> set to 1 because only 1 minor fragment is needed to read 1 row-group. 
> The net effect of this is the width of 1 is applied to the major fragment 
> which consists of union-all and its inputs.  This is sub-optimal because it 
> prevents parallelization of the other input and the union-all operator 
> itself.  
> Here's an example query and plan that illustrates the issue: 
> {noformat}
> alter session set `planner.slice_target` = 1;
> explain plan for 
> (select c.c_nationkey, c.c_custkey, c.c_name
> from
> dfs.`/Users/asinha/data/tpchmulti/customer` c
> inner join
> dfs.`/Users/asinha/data/tpchmulti/nation`  n
> on c.c_nationkey = n.n_nationkey)
> union all
> (select c_nationkey, c_custkey, c_name
> from dfs.`/Users/asinha/data/tpchmulti/customer` c limit 1)
> +------+------+
> | text | json |
> +------+------+
> | 00-00    Screen
> 00-01      Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-02        Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-03          UnionAll(all=[true])
> 00-05            Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-07              HashJoin(condition=[=($0, $3)], joinType=[inner])
> 00-10                Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-13                  HashToRandomExchange(dist0=[[$0]])
> 01-01                    UnorderedMuxExchange
> 03-01                      Project(c_nationkey=[$0], c_custkey=[$1], 
> c_name=[$2], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
> 03-02                        Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath 
> [path=file:/Users/asinha/data/tpchmulti/customer]], 
> selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, 
> usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> 00-09                Project(n_nationkey=[$0])
> 00-12                  HashToRandomExchange(dist0=[[$0]])
> 02-01                    UnorderedMuxExchange
> 04-01                      Project(n_nationkey=[$0], 
> E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
> 04-02                        Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath [path=file:/Users/asinha/data/tpchmulti/nation]], 
> selectionRoot=file:/Users/asinha/data/tpchmulti/nation, numFiles=1, 
> usedMetadataFile=false, columns=[`n_nationkey`]]])
> 00-04            Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-06              SelectionVectorRemover
> 00-08                Limit(fetch=[1])
> 00-11                  Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath 
> [path=/Users/asinha/data/tpchmulti/customer/01.parquet]], 
> selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, 
> usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> {noformat}
> Note that Union-all and HashJoin are part of fragment 0 (single minor 
> fragment) even though they could have been parallelized.  This clearly 
> affects performance for larger data sets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to