[
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361120#comment-16361120
]
ASF GitHub Bot commented on DRILL-6115:
---------------------------------------
Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1110#discussion_r167589289
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
---
@@ -112,6 +120,73 @@ public RelWriter explainTerms(RelWriter pw) {
return pw;
}
+ /**
+ * This method creates a new UnorderedMux and Demux exchanges if mux
operators are enabled.
+ * @param child input to the new Unordered[Mux/Demux]Prel or new
HashToRandomExchange node.
+ * @param options options manager to check if mux is enabled.
+ */
+ @Override
+ public Prel getMuxPrel(Prel child, OptionManager options) {
+ boolean isMuxEnabled =
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
+ Prel newPrel = child;
+
+ final List<String> childFields = child.getRowType().getFieldNames();
+
+ List <RexNode> removeUpdatedExpr = null;
+
+ if (isMuxEnabled) {
+ // Insert Project Operator with new column that will be a hash for
HashToRandomExchange fields
+ final List<DistributionField> distFields = getFields();
+ final List<String> outputFieldNames =
Lists.newArrayList(childFields);
+ final RexBuilder rexBuilder = getCluster().getRexBuilder();
+ final List<RelDataTypeField> childRowTypeFields =
child.getRowType().getFieldList();
+
+ final HashPrelUtil.HashExpressionCreatorHelper<RexNode> hashHelper =
new HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
+ final List<RexNode> distFieldRefs =
Lists.newArrayListWithExpectedSize(distFields.size());
+ for(int i=0; i<distFields.size(); i++) {
+ final int fieldId = distFields.get(i).getFieldId();
+
distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(),
fieldId));
+ }
+
+ final List <RexNode> updatedExpr =
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+ removeUpdatedExpr =
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+ for ( RelDataTypeField field : childRowTypeFields) {
+ RexNode rex = rexBuilder.makeInputRef(field.getType(),
field.getIndex());
+ updatedExpr.add(rex);
+ removeUpdatedExpr.add(rex);
+ }
+
+ outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
+ final RexNode distSeed =
rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); //
distribution seed
+
updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs,
distSeed, hashHelper));
+
+ RelDataType rowType =
RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr,
outputFieldNames);
+
+ ProjectPrel addColumnprojectPrel = new
ProjectPrel(child.getCluster(), child.getTraitSet(), child, updatedExpr,
rowType);
+
+ newPrel = new
UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(),
addColumnprojectPrel.getTraitSet(),
+ addColumnprojectPrel);
+ }
+
+ newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(),
newPrel, getFields());
+
+ if
(options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) {
+ HashToRandomExchangePrel hashExchangePrel =
(HashToRandomExchangePrel) newPrel;
+ // Insert a DeMuxExchange to narrow down the number of receivers
+ newPrel = new UnorderedDeMuxExchangePrel(getCluster(),
getTraitSet(), hashExchangePrel,
+ hashExchangePrel.getFields());
+ }
+
+ if ( isMuxEnabled ) {
--- End diff --
Use consistent formating.
> SingleMergeExchange is not scaling up when many minor fragments are allocated
> for a query.
> ------------------------------------------------------------------------------------------
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
> Issue Type: Improvement
> Components: Execution - Relational Operators
> Affects Versions: 1.12.0
> Reporter: Hanumath Rao Maduri
> Assignee: Hanumath Rao Maduri
> Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output.
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +------+------+
> | text | json |
> +------+------+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]],
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec
> [tableName=maprfs:///drill/tables/lineitem, condition=null],
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor
> fragments which are all merged on a single node with one merge receiver.
> Doing so will create lot of memory pressure on the receiver node and also
> execution bottleneck. To address this issue, merge receiver should be
> multiphase merge receiver.
> Ideally for large cluster one can introduce tree merges so that merging can
> be done parallel. But as a first step I think it is better to use the
> existing infrastructure for multiplexing operators to generate an OrderedMux
> so that all the minor fragments pertaining to one DRILLBIT should be merged
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which
> is parallel
> and 10 minorfragments are merged at the receiver node.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)