Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1110#discussion_r167589289
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
---
@@ -112,6 +120,73 @@ public RelWriter explainTerms(RelWriter pw) {
return pw;
}
+ /**
+ * This method creates a new UnorderedMux and Demux exchanges if mux
operators are enabled.
+ * @param child input to the new Unordered[Mux/Demux]Prel or new
HashToRandomExchange node.
+ * @param options options manager to check if mux is enabled.
+ */
+ @Override
+ public Prel getMuxPrel(Prel child, OptionManager options) {
+ boolean isMuxEnabled =
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
+ Prel newPrel = child;
+
+ final List<String> childFields = child.getRowType().getFieldNames();
+
+ List <RexNode> removeUpdatedExpr = null;
+
+ if (isMuxEnabled) {
+ // Insert Project Operator with new column that will be a hash for
HashToRandomExchange fields
+ final List<DistributionField> distFields = getFields();
+ final List<String> outputFieldNames =
Lists.newArrayList(childFields);
+ final RexBuilder rexBuilder = getCluster().getRexBuilder();
+ final List<RelDataTypeField> childRowTypeFields =
child.getRowType().getFieldList();
+
+ final HashPrelUtil.HashExpressionCreatorHelper<RexNode> hashHelper =
new HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
+ final List<RexNode> distFieldRefs =
Lists.newArrayListWithExpectedSize(distFields.size());
+ for(int i=0; i<distFields.size(); i++) {
+ final int fieldId = distFields.get(i).getFieldId();
+
distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(),
fieldId));
+ }
+
+ final List <RexNode> updatedExpr =
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+ removeUpdatedExpr =
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+ for ( RelDataTypeField field : childRowTypeFields) {
+ RexNode rex = rexBuilder.makeInputRef(field.getType(),
field.getIndex());
+ updatedExpr.add(rex);
+ removeUpdatedExpr.add(rex);
+ }
+
+ outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
+ final RexNode distSeed =
rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); //
distribution seed
+
updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs,
distSeed, hashHelper));
+
+ RelDataType rowType =
RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr,
outputFieldNames);
+
+ ProjectPrel addColumnprojectPrel = new
ProjectPrel(child.getCluster(), child.getTraitSet(), child, updatedExpr,
rowType);
+
+ newPrel = new
UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(),
addColumnprojectPrel.getTraitSet(),
+ addColumnprojectPrel);
+ }
+
+ newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(),
newPrel, getFields());
+
+ if
(options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) {
+ HashToRandomExchangePrel hashExchangePrel =
(HashToRandomExchangePrel) newPrel;
+ // Insert a DeMuxExchange to narrow down the number of receivers
+ newPrel = new UnorderedDeMuxExchangePrel(getCluster(),
getTraitSet(), hashExchangePrel,
+ hashExchangePrel.getFields());
+ }
+
+ if ( isMuxEnabled ) {
--- End diff --
Use consistent formating.
---