[
https://issues.apache.org/jira/browse/DRILL-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412523#comment-15412523
]
ASF GitHub Bot commented on DRILL-4833:
---------------------------------------
Github user jinfengni commented on a diff in the pull request:
https://github.com/apache/drill/pull/562#discussion_r73959459
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
---
@@ -72,6 +73,47 @@ public Prel visitScan(ScanPrel prel, MajorFragmentStat
s) throws RuntimeExceptio
return prel;
}
+ /**
+ * A union-all should be treated differently compared to a join operator
because joins impose
+ * a co-location requirement and therefore insert an exchange on both
sides of the
+ * join (e.g HashToRandomExchange or BroadcastExchange), thus the major
fragment of the join itself
+ * is different from the major fragment of its children. Union-All does
not impose the co-location
+ * requirement on its children, hence the major fragment of the
union-all may be the same as that of
+ * its children. Thus, we should take an 'aggregate' view of all its
children to decide the parallelism.
+ */
+ @Override
+ public Prel visitUnionAll(UnionAllPrel prel, MajorFragmentStat s) throws
RuntimeException {
+ List<RelNode> children = Lists.newArrayList();
+ s.add(prel);
+
+ List<MajorFragmentStat> statList = Lists.newArrayList();
+ for (Prel p : prel) {
+ // for each input of union-all, create a temporary MajorFragmentStat
instance
+ MajorFragmentStat childStat = new MajorFragmentStat(s /* use
existing stat to initialize */);
+ statList.add(childStat);
+ childStat.add(p);
+ }
+
+ int i = 0;
+ for(Prel p : prel) {
+ children.add(p.accept(this, statList.get(i++)));
+ }
+
+ MajorFragmentStat maxStat = statList.get(0);
+ // get the max width of all child stats
+ for (int j=1; j < statList.size(); j++) {
+ if (statList.get(j).getMaxWidth() > maxStat.getMaxWidth()) {
+ maxStat = statList.get(j);
+ }
+ }
+
+ // width of the major fragment that contains union-all should be the
maximum
+ // width of all its inputs
+ s.setMaxWidth(maxStat.getMaxWidth());
--- End diff --
In addition to maxWidth, should we maintain maxRows as well for the stat
object of Union-all operator?
> Union-All with a small cardinality input on one side does not get parallelized
> ------------------------------------------------------------------------------
>
> Key: DRILL-4833
> URL: https://issues.apache.org/jira/browse/DRILL-4833
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.7.0
> Reporter: Aman Sinha
> Assignee: Jinfeng Ni
>
> When a Union-All has an input that is a LIMIT 1 (or some small value relative
> to the slice_target), and that input is accessing Parquet files, Drill does
> an optimization where a single Parquet file is read (based on the rowcount
> statistics in the Parquet file, we determine that reading 1 file is
> sufficient). This also means that the max width for that major fragment is
> set to 1 because only 1 minor fragment is needed to read 1 row-group.
> The net effect of this is the width of 1 is applied to the major fragment
> which consists of union-all and its inputs. This is sub-optimal because it
> prevents parallelization of the other input and the union-all operator
> itself.
> Here's an example query and plan that illustrates the issue:
> {noformat}
> alter session set `planner.slice_target` = 1;
> explain plan for
> (select c.c_nationkey, c.c_custkey, c.c_name
> from
> dfs.`/Users/asinha/data/tpchmulti/customer` c
> inner join
> dfs.`/Users/asinha/data/tpchmulti/nation` n
> on c.c_nationkey = n.n_nationkey)
> union all
> (select c_nationkey, c_custkey, c_name
> from dfs.`/Users/asinha/data/tpchmulti/customer` c limit 1)
> +------+------+
> | text | json |
> +------+------+
> | 00-00 Screen
> 00-01 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-02 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-03 UnionAll(all=[true])
> 00-05 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-07 HashJoin(condition=[=($0, $3)], joinType=[inner])
> 00-10 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-13 HashToRandomExchange(dist0=[[$0]])
> 01-01 UnorderedMuxExchange
> 03-01 Project(c_nationkey=[$0], c_custkey=[$1],
> c_name=[$2], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
> 03-02 Scan(groupscan=[ParquetGroupScan
> [entries=[ReadEntryWithPath
> [path=file:/Users/asinha/data/tpchmulti/customer]],
> selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1,
> usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> 00-09 Project(n_nationkey=[$0])
> 00-12 HashToRandomExchange(dist0=[[$0]])
> 02-01 UnorderedMuxExchange
> 04-01 Project(n_nationkey=[$0],
> E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
> 04-02 Scan(groupscan=[ParquetGroupScan
> [entries=[ReadEntryWithPath [path=file:/Users/asinha/data/tpchmulti/nation]],
> selectionRoot=file:/Users/asinha/data/tpchmulti/nation, numFiles=1,
> usedMetadataFile=false, columns=[`n_nationkey`]]])
> 00-04 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-06 SelectionVectorRemover
> 00-08 Limit(fetch=[1])
> 00-11 Scan(groupscan=[ParquetGroupScan
> [entries=[ReadEntryWithPath
> [path=/Users/asinha/data/tpchmulti/customer/01.parquet]],
> selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1,
> usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> {noformat}
> Note that Union-all and HashJoin are part of fragment 0 (single minor
> fragment) even though they could have been parallelized. This clearly
> affects performance for larger data sets.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)