DRILL-6115: SingleMergeExchange is not scaling up when many minor fragments are 
allocated for a query.

DRILL-6115: Refactoring the existing code.

close apache/drill#1110


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f203ea9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f203ea9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f203ea9

Branch: refs/heads/master
Commit: 4f203ea99e2d6bd5596672a0ea67445720630228
Parents: 8cb3234
Author: Hanumath Rao Maduri <hanu....@gmail.com>
Authored: Mon Feb 12 18:19:07 2018 -0800
Committer: Aman Sinha <asi...@maprtech.com>
Committed: Fri Feb 23 14:13:40 2018 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../physical/base/AbstractPhysicalVisitor.java  |  18 --
 .../exec/physical/base/PhysicalVisitor.java     |   7 -
 .../physical/config/AbstractMuxExchange.java    |  15 ++
 .../physical/config/OrderedMuxExchange.java     |  52 +++++
 .../physical/config/UnorderedMuxExchange.java   |   9 +-
 .../exec/planner/physical/ExchangePrel.java     |  13 ++
 .../exec/planner/physical/HashPrelUtil.java     |  23 ++-
 .../physical/HashToMergeExchangePrel.java       |   4 -
 .../physical/HashToRandomExchangePrel.java      |  77 ++++++++
 .../physical/OrderedMuxExchangePrel.java        |  74 +++++++
 .../exec/planner/physical/PlannerSettings.java  |   1 +
 .../physical/SingleMergeExchangePrel.java       |  21 ++
 .../visitor/InsertLocalExchangeVisitor.java     | 115 +----------
 .../planner/physical/visitor/PrelVisitor.java   |   1 -
 .../server/options/SystemOptionManager.java     |   1 +
 .../src/main/resources/drill-module.conf        |   1 +
 .../physical/impl/TestOrderedMuxExchange.java   | 193 +++++++++++++++++++
 18 files changed, 479 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a1a94fa..fb2907d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -455,6 +455,9 @@ public final class ExecConstants {
   public static final String AVERAGE_FIELD_WIDTH_KEY = 
"planner.memory.average_field_width";
   public static final OptionValidator AVERAGE_FIELD_WIDTH = new 
PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE);
 
+  // Mux Exchange options.
+  public static final String ORDERED_MUX_EXCHANGE = 
"planner.enable_ordered_mux_exchange";
+
   // Resource management boot-time options.
 
   public static final String MAX_MEMORY_PER_NODE = 
"drill.exec.rm.memory_per_node";

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index a3b5f27..9933ddc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.HashToRandomExchange;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
@@ -32,14 +31,12 @@ import 
org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.physical.config.UnionExchange;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.config.Values;
 import org.apache.drill.exec.physical.config.WindowPOP;
@@ -176,16 +173,6 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
-  public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E 
{
-    return visitExchange(op, value);
-  }
-
-  @Override
-  public T visitRangeSender(RangeSender op, X value) throws E {
-    return visitSender(op, value);
-  }
-
-  @Override
   public T visitBroadcastSender(BroadcastSender op, X value) throws E {
     return visitSender(op, value);
   }
@@ -201,11 +188,6 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
-  public T visitUnionExchange(UnionExchange op, X value) throws E {
-    return visitExchange(op, value);
-  }
-
-  @Override
   public T visitProducerConsumer(ProducerConsumer op, X value) throws E {
     return visitOp(op, value);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index ae6ff60..0eadb79 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.HashToRandomExchange;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
@@ -32,14 +31,12 @@ import 
org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.physical.config.UnionExchange;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.config.Values;
 import org.apache.drill.exec.physical.config.WindowPOP;
@@ -58,7 +55,6 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitGroupScan(GroupScan groupScan, EXTRA value) throws EXCEP;
   public RETURN visitSubScan(SubScan subScan, EXTRA value) throws EXCEP;
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
-
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
   public RETURN visitUnion(UnionAll union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
@@ -81,12 +77,9 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP 
extends Throwable> {
   public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA 
value) throws EXCEP;
   public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) 
throws EXCEP;
   public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) 
throws EXCEP;
-  public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) 
throws EXCEP;
-  public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP;
   public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws 
EXCEP;
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
-  public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
   public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP;
   public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws 
EXCEP;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
index 6715f96..5a76dec 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
 import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -51,6 +52,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  * for the 10 fragments is 300 instead of earlier number 10*300.
  */
 public abstract class AbstractMuxExchange extends AbstractExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractMuxExchange.class);
 
   // Ephemeral info used when creating execution fragments.
   protected Map<Integer, MinorFragmentEndpoint> senderToReceiverMapping;
@@ -90,6 +92,19 @@ public abstract class AbstractMuxExchange extends 
AbstractExchange {
     return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, 
receiver.getEndpoint());
   }
 
+  protected final List<MinorFragmentEndpoint> getSenders(int minorFragmentId) {
+    createSenderReceiverMapping();
+
+    List<MinorFragmentEndpoint> senders = 
receiverToSenderMapping.get(minorFragmentId);
+
+    logger.debug("Minor fragment {}, receives data from following senders {}", 
minorFragmentId, senders);
+    if (senders == null || senders.size() <= 0) {
+      throw new IllegalStateException(String.format("Failed to find senders 
for receiver [%d]", minorFragmentId));
+    }
+
+    return senders;
+  }
+
   protected void createSenderReceiverMapping() {
     if (isSenderReceiverMappingCreated) {
       return;

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
new file mode 100644
index 0000000..2b9653a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming batches 
are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+
+  private final List<Ordering> orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List<Ordering> orderings) {
+    super(child);
+    this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    return new MergingReceiverPOP(senderMajorFragmentId, 
getSenders(minorFragmentId), orderings, false);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new OrderedMuxExchange(child, orderings);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
index 46f1fd7..fed8e54 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
@@ -38,14 +38,7 @@ public class UnorderedMuxExchange extends 
AbstractMuxExchange {
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    createSenderReceiverMapping();
-
-    List<MinorFragmentEndpoint> senders = 
receiverToSenderMapping.get(minorFragmentId);
-    if (senders == null || senders.size() <= 0) {
-      throw new IllegalStateException(String.format("Failed to find senders 
for receiver [%d]", minorFragmentId));
-    }
-
-    return new UnorderedReceiver(senderMajorFragmentId, senders, false);
+    return new UnorderedReceiver(senderMajorFragmentId, 
getSenders(minorFragmentId), false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
index 9ad1c67..f3f87ba 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
@@ -22,6 +22,9 @@ import 
org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.drill.exec.server.options.OptionManager;
+
+import java.util.Collections;
 
 public abstract class ExchangePrel extends SinglePrel{
 
@@ -34,4 +37,14 @@ public abstract class ExchangePrel extends SinglePrel{
     return logicalVisitor.visitExchange(this, value);
   }
 
+  /**
+   * The derived classes can override this method to create relevant mux 
exchanges.
+   * If this method is not overrided the default behaviour is to clone itself.
+   * @param child input to the new muxPrel or new Exchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  public Prel constructMuxPrel(Prel child, OptionManager options) {
+    return (Prel)copy(getTraitSet(), 
Collections.singletonList(((RelNode)child)));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
index 25b7017..9c12cc5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
@@ -19,19 +19,15 @@ package org.apache.drill.exec.planner.physical;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.expr.DirectExpression;
-import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -63,6 +59,21 @@ public class HashPrelUtil {
         }
       };
 
+  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper<RexNode> {
+    private final RexBuilder rexBuilder;
+
+    public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
+      this.rexBuilder = rexBuilder;
+    }
+
+    @Override
+    public RexNode createCall(String funcName, List<RexNode> inputFields) {
+      final DrillSqlOperator op =
+              new DrillSqlOperator(funcName, inputFields.size(), true, false);
+      return rexBuilder.makeCall(op, inputFields);
+    }
+  }
+
   // The hash32 functions actually use hash64 underneath.  The reason we want 
to call hash32 is that
   // the hash based operators make use of 4 bytes of hash value, not 8 bytes 
(for reduced memory use).
   private static final String HASH32_FUNCTION_NAME = "hash32";

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
index 671eb61..2272a9b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
@@ -90,10 +90,6 @@ public class HashToMergeExchangePrel extends ExchangePrel {
 
   }
 
-  public List<DistributionField> getDistFields() {
-    return this.distFields;
-  }
-
   public RelCollation getCollation() {
     return this.collation;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index b8a4001..2254c56 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -18,10 +18,17 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.calcite.linq4j.Ord;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashToRandomExchange;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -35,6 +42,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.drill.exec.server.options.OptionManager;
 
 
 public class HashToRandomExchangePrel extends ExchangePrel {
@@ -112,6 +120,75 @@ public class HashToRandomExchangePrel extends ExchangePrel 
{
     return pw;
   }
 
+  /**
+   * This method creates a new UnorderedMux and Demux exchanges if mux 
operators are enabled.
+   * @param child input to the new Unordered[Mux/Demux]Prel or new 
HashToRandomExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel constructMuxPrel(Prel child, OptionManager options) {
+    boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
+    Prel newPrel = child;
+
+    final List<String> childFields = child.getRowType().getFieldNames();
+
+    List <RexNode> removeUpdatedExpr = null;
+
+    if (isMuxEnabled) {
+      // Insert Project Operator with new column that will be a hash for 
HashToRandomExchange fields
+      final List<DistributionField> distFields = getFields();
+      final List<String> outputFieldNames = Lists.newArrayList(childFields);
+      final RexBuilder rexBuilder = getCluster().getRexBuilder();
+      final List<RelDataTypeField> childRowTypeFields = 
child.getRowType().getFieldList();
+
+      final HashPrelUtil.HashExpressionCreatorHelper<RexNode> hashHelper =
+                                    new 
HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
+
+      final List<RexNode> distFieldRefs = 
Lists.newArrayListWithExpectedSize(distFields.size());
+      for (int i=0; i<distFields.size(); i++) {
+        final int fieldId = distFields.get(i).getFieldId();
+        
distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(),
 fieldId));
+      }
+
+      final List <RexNode> updatedExpr = 
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+      removeUpdatedExpr = 
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+      for (RelDataTypeField field : childRowTypeFields) {
+        RexNode rex = rexBuilder.makeInputRef(field.getType(), 
field.getIndex());
+        updatedExpr.add(rex);
+        removeUpdatedExpr.add(rex);
+      }
+
+      outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
+      final RexNode distSeed = 
rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); // 
distribution seed
+      
updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs, 
distSeed, hashHelper));
+
+      RelDataType rowType = 
RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr, 
outputFieldNames);
+
+      ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), 
child.getTraitSet(), child, updatedExpr, rowType);
+
+      newPrel = new 
UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), 
addColumnprojectPrel.getTraitSet(),
+              addColumnprojectPrel);
+    }
+
+    newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(), 
newPrel, getFields());
+
+    if 
(options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) {
+      HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) 
newPrel;
+      // Insert a DeMuxExchange to narrow down the number of receivers
+      newPrel = new UnorderedDeMuxExchangePrel(getCluster(), getTraitSet(), 
hashExchangePrel, hashExchangePrel.getFields());
+    }
+
+    if (isMuxEnabled) {
+      // remove earlier inserted Project Operator - since it creates issues 
down the road in HashJoin
+      RelDataType removeRowType = 
RexUtil.createStructType(newPrel.getCluster().getTypeFactory(), 
removeUpdatedExpr, childFields);
+
+      ProjectPrel removeColumnProjectPrel = new 
ProjectPrel(newPrel.getCluster(), newPrel.getTraitSet(),
+                                                            newPrel, 
removeUpdatedExpr, removeRowType);
+      return removeColumnProjectPrel;
+    }
+    return newPrel;
+  }
+
   @Override
   public SelectionVectorMode getEncoding() {
     return SelectionVectorMode.NONE;

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java
new file mode 100644
index 0000000..de9ff25
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.OrderedMuxExchange;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.linq4j.Ord;
+
+
+/**
+ * OrderedMuxExchangePrel is mux exchange created to multiplex the streams for 
a MergeReceiver.
+ */
+public class OrderedMuxExchangePrel extends ExchangePrel {
+  private final RelCollation fieldCollation;
+
+  public OrderedMuxExchangePrel(RelOptCluster cluster, RelTraitSet traits, 
RelNode child, RelCollation collation) {
+    super(cluster, traits, child);
+    this.fieldCollation = collation;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new OrderedMuxExchangePrel(getCluster(), traitSet, sole(inputs), 
fieldCollation);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    Prel child = (Prel) this.getInput();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    OrderedMuxExchange p = new OrderedMuxExchange(childPOP, 
PrelUtil.getOrdering(fieldCollation, getInput().getRowType()));
+    return creator.addMetadata(this, p);
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    super.explainTerms(pw);
+    for (Ord<RelFieldCollation> ord : 
Ord.zip(this.fieldCollation.getFieldCollations())) {
+      pw.item("sort" + ord.i, ord.e);
+    }
+    return pw;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 74ef601..1271ff9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -66,6 +66,7 @@ public class PlannerSettings implements Context{
   public static final OptionValidator NLJOIN_FOR_SCALAR = new 
BooleanValidator("planner.enable_nljoin_for_scalar_only");
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new 
RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 
Double.MAX_VALUE);
   public static final OptionValidator MUX_EXCHANGE = new 
BooleanValidator("planner.enable_mux_exchange");
+  public static final OptionValidator ORDERED_MUX_EXCHANGE = new 
BooleanValidator("planner.enable_ordered_mux_exchange");
   public static final OptionValidator DEMUX_EXCHANGE = new 
BooleanValidator("planner.enable_demux_exchange");
   public static final OptionValidator PARTITION_SENDER_THREADS_FACTOR = new 
LongValidator("planner.partitioner_sender_threads_factor");
   public static final OptionValidator PARTITION_SENDER_MAX_THREADS = new 
LongValidator("planner.partitioner_sender_max_threads");

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
index 26f7074..a03c2f5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
@@ -36,6 +36,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.drill.exec.server.options.OptionManager;
 
 public class SingleMergeExchangePrel extends ExchangePrel {
 
@@ -93,6 +94,22 @@ public class SingleMergeExchangePrel extends ExchangePrel {
     return creator.addMetadata(this, g);
   }
 
+  /**
+   * This method creates a new OrderedMux exchange if mux operators are 
enabled.
+   * @param child input to the new muxPrel or new SingleMergeExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel constructMuxPrel(Prel child, OptionManager options) throws 
RuntimeException {
+    Prel outPrel = child;
+    if 
(options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val
 &&
+        
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val) {
+      outPrel = new OrderedMuxExchangePrel(getCluster(), getTraitSet(), 
getInput(), getCollation());
+    }
+
+    return new SingleMergeExchangePrel(getCluster(), getTraitSet(), outPrel, 
getCollation());
+  }
+
   @Override
   public RelWriter explainTerms(RelWriter pw) {
     super.explainTerms(pw);
@@ -106,6 +123,10 @@ public class SingleMergeExchangePrel extends ExchangePrel {
     return pw;
   }
 
+  public RelCollation getCollation() {
+    return this.collation;
+  }
+
   @Override
   public SelectionVectorMode getEncoding() {
     return SelectionVectorMode.NONE;

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
index fba4cbe..3d6e0be 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
@@ -20,133 +20,38 @@ package org.apache.drill.exec.planner.physical.visitor;
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.planner.physical.ExchangePrel;
-import org.apache.drill.exec.planner.physical.HashPrelUtil;
-import 
org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
-import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
-import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.math.BigDecimal;
-import java.util.Collections;
 import java.util.List;
 
 public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel, Void, 
RuntimeException> {
-  private final boolean isMuxEnabled;
-  private final boolean isDeMuxEnabled;
-
-
-  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper<RexNode> {
-    private final RexBuilder rexBuilder;
+  private final OptionManager options;
 
-    public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
-      this.rexBuilder = rexBuilder;
-    }
-
-    @Override
-    public RexNode createCall(String funcName, List<RexNode> inputFields) {
-      final DrillSqlOperator op =
-          new DrillSqlOperator(funcName, inputFields.size(), true, false);
-      return rexBuilder.makeCall(op, inputFields);
-    }
+  private static boolean isMuxEnabled(OptionManager options) {
+    return 
(options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val ||
+            
options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val ||
+            
options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val);
   }
 
   public static Prel insertLocalExchanges(Prel prel, OptionManager options) {
-    boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
-    boolean isDeMuxEnabled = 
options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val;
 
-    if (isMuxEnabled || isDeMuxEnabled) {
-      return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, 
isDeMuxEnabled), null);
+    if (isMuxEnabled(options)) {
+      return prel.accept(new InsertLocalExchangeVisitor(options), null);
     }
 
     return prel;
   }
 
-  public InsertLocalExchangeVisitor(boolean isMuxEnabled, boolean 
isDeMuxEnabled) {
-    this.isMuxEnabled = isMuxEnabled;
-    this.isDeMuxEnabled = isDeMuxEnabled;
+  public InsertLocalExchangeVisitor(OptionManager options) {
+    this.options = options;
   }
 
   @Override
   public Prel visitExchange(ExchangePrel prel, Void value) throws 
RuntimeException {
     Prel child = ((Prel)prel.getInput()).accept(this, null);
-    // Whenever we encounter a HashToRandomExchangePrel
-    //   If MuxExchange is enabled, insert a UnorderedMuxExchangePrel before 
HashToRandomExchangePrel.
-    //   If DeMuxExchange is enabled, insert a UnorderedDeMuxExchangePrel 
after HashToRandomExchangePrel.
-    if (!(prel instanceof HashToRandomExchangePrel)) {
-      return (Prel)prel.copy(prel.getTraitSet(), 
Collections.singletonList(((RelNode)child)));
-    }
-
-    Prel newPrel = child;
-
-    final HashToRandomExchangePrel hashPrel = (HashToRandomExchangePrel) prel;
-    final List<String> childFields = child.getRowType().getFieldNames();
-
-    List <RexNode> removeUpdatedExpr = null;
-
-    if (isMuxEnabled) {
-      // Insert Project Operator with new column that will be a hash for 
HashToRandomExchange fields
-      final List<DistributionField> distFields = hashPrel.getFields();
-      final List<String> outputFieldNames = Lists.newArrayList(childFields);
-      final RexBuilder rexBuilder = prel.getCluster().getRexBuilder();
-      final List<RelDataTypeField> childRowTypeFields = 
child.getRowType().getFieldList();
-
-      final HashExpressionCreatorHelper<RexNode> hashHelper = new 
RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
-      final List<RexNode> distFieldRefs = 
Lists.newArrayListWithExpectedSize(distFields.size());
-      for(int i=0; i<distFields.size(); i++) {
-        final int fieldId = distFields.get(i).getFieldId();
-        
distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(),
 fieldId));
-      }
-
-      final List <RexNode> updatedExpr = 
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
-      removeUpdatedExpr = 
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
-      for ( RelDataTypeField field : childRowTypeFields) {
-        RexNode rex = rexBuilder.makeInputRef(field.getType(), 
field.getIndex());
-        updatedExpr.add(rex);
-        removeUpdatedExpr.add(rex);
-      }
-
-      outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
-      final RexNode distSeed = 
rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); // 
distribution seed
-      
updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs, 
distSeed, hashHelper));
-
-      RelDataType rowType = 
RexUtil.createStructType(prel.getCluster().getTypeFactory(), updatedExpr, 
outputFieldNames);
-
-      ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), 
child.getTraitSet(), child, updatedExpr, rowType);
-
-      newPrel = new 
UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), 
addColumnprojectPrel.getTraitSet(),
-          addColumnprojectPrel);
-    }
-
-    newPrel = new HashToRandomExchangePrel(prel.getCluster(),
-        prel.getTraitSet(), newPrel, ((HashToRandomExchangePrel) 
prel).getFields());
-
-    if (isDeMuxEnabled) {
-      HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) 
newPrel;
-      // Insert a DeMuxExchange to narrow down the number of receivers
-      newPrel = new UnorderedDeMuxExchangePrel(prel.getCluster(), 
prel.getTraitSet(), hashExchangePrel,
-          hashExchangePrel.getFields());
-    }
-
-    if ( isMuxEnabled ) {
-      // remove earlier inserted Project Operator - since it creates issues 
down the road in HashJoin
-      RelDataType removeRowType = 
RexUtil.createStructType(newPrel.getCluster().getTypeFactory(), 
removeUpdatedExpr, childFields);
-
-      ProjectPrel removeColumnProjectPrel = new 
ProjectPrel(newPrel.getCluster(), newPrel.getTraitSet(), newPrel, 
removeUpdatedExpr, removeRowType);
-      return removeColumnProjectPrel;
-    }
-    return newPrel;
+    return prel.constructMuxPrel(child, options);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
index b3a25c1..0e4d05c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
@@ -35,7 +35,6 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
-
   public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 2b170e7..d18acb7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -91,6 +91,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(PlannerSettings.NLJOIN_FOR_SCALAR),
       new OptionDefinition(PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR),
       new OptionDefinition(PlannerSettings.MUX_EXCHANGE),
+      new OptionDefinition(PlannerSettings.ORDERED_MUX_EXCHANGE),
       new OptionDefinition(PlannerSettings.DEMUX_EXCHANGE),
       new OptionDefinition(PlannerSettings.PRODUCER_CONSUMER),
       new OptionDefinition(PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE),

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 39320c2..2305a30 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -479,6 +479,7 @@ drill.exec.options: {
     planner.enable_mergejoin: true,
     planner.enable_multiphase_agg: true,
     planner.enable_mux_exchange: true,
+    planner.enable_ordered_mux_exchange: true,
     planner.enable_nestedloopjoin: true,
     planner.enable_nljoin_for_scalar_only: true,
     planner.enable_streamagg: true,

http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
new file mode 100644
index 0000000..cee36fa
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+  private final static String TOPN = "TopN";
+  private final static int NUM_DEPTS = 40;
+  private final static int NUM_EMPLOYEES = 1000;
+  private final static int NUM_MNGRS = 1;
+  private final static int NUM_IDS = 1;
+  private final static String MATCH_PATTERN_ACROSS_LINES = 
"((?s).*[\\n\\r].*)";
+  private static final String EMPT_TABLE = "empTable";
+
+  /**
+   * Generate data for two tables. Each table consists of several JSON files.
+   */
+  @BeforeClass
+  public static void generateTestDataAndQueries() throws Exception {
+    // Table consists of two columns "emp_id", "emp_name" and "dept_id"
+    final File empTableLocation = 
dirTestWatcher.makeRootSubDir(Paths.get(EMPT_TABLE));
+
+    // Write 100 records for each new file
+    final int empNumRecsPerFile = 100;
+    for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; 
fileIndex++) {
+      File file = new File(empTableLocation, fileIndex + ".json");
+      PrintWriter printWriter = new PrintWriter(file);
+      for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < 
(fileIndex+1)*empNumRecsPerFile; recordIndex++) {
+        String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : 
\"Employee %d\", \"dept_id\" : %d, \"mng_id\" : %d, \"some_id\" : %d }",
+                recordIndex, recordIndex, recordIndex % NUM_DEPTS, recordIndex 
% NUM_MNGRS, recordIndex % NUM_IDS);
+        printWriter.println(record);
+      }
+      printWriter.close();
+    }
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .maxParallelization(1)
+            
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client.alterSession(ExecConstants.SLICE_TARGET, 10);
+      String sql = "SELECT emp_id, emp_name FROM dfs.`empTable` e order BY 
emp_name, emp_id";
+      client.testBuilder()
+            .unOrdered()
+            .optionSettingQueriesForTestQuery("alter session set 
`planner.slice_target` = 10;")
+            .sqlQuery(sql)
+            .optionSettingQueriesForBaseline("alter session set 
`planner.enable_ordered_mux_exchange` = false") // Use default option setting.
+            .sqlBaselineQuery(sql)
+            .build()
+            .run();
+      client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true);
+      String explainText = client.queryBuilder().sql(sql).explainText();
+      assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
+    }
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for window functions.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForWindowAgg() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .maxParallelization(1)
+            
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client.alterSession(ExecConstants.SLICE_TARGET, 10);
+      String sql = "SELECT emp_name, max(emp_id) over (order by emp_name) FROM 
dfs.`empTable` e order BY emp_name";
+      client.testBuilder()
+            .unOrdered()
+            .optionSettingQueriesForTestQuery("alter session set 
`planner.slice_target` = 10;")
+            .sqlQuery(sql)
+            .optionSettingQueriesForBaseline("alter session set 
`planner.enable_ordered_mux_exchange` = false") // Use default option setting.
+            .sqlBaselineQuery(sql)
+            .build()
+            .run();
+      client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true);
+      String explainText = client.queryBuilder().sql(sql).explainText();
+      assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
+    }
+  }
+
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by with 
limit.
+   * It checks that the limit is pushed down the OrderedMuxExchange.
+   *
+   * @throws Exception if anything goes wrong
+   */
+  @Test
+  public void testLimitOnOrderedMux() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .maxParallelization(1)
+            
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client.alterSession(ExecConstants.SLICE_TARGET, 10);
+      String sql = "SELECT emp_id, emp_name FROM dfs.`empTable` e order BY 
emp_name, emp_id limit 10";
+      client.testBuilder()
+            .unOrdered()
+            .optionSettingQueriesForTestQuery("alter session set 
`planner.slice_target` = 10;")
+            .sqlQuery(sql)
+            .optionSettingQueriesForBaseline("alter session set 
`planner.enable_ordered_mux_exchange` = false") // Use default option setting.
+            .sqlBaselineQuery(sql)
+            .build()
+            .run();
+      client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true);
+      String explainText = client.queryBuilder().sql(sql).explainText();
+      assertTrue(explainText.matches(String.format(MATCH_PATTERN_ACROSS_LINES 
+ "%s"+
+              MATCH_PATTERN_ACROSS_LINES 
+"%s"+MATCH_PATTERN_ACROSS_LINES,ORDERED_MUX_EXCHANGE, TOPN)));
+    }
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by with 
limit and window agg.
+   * It checks that the limit is pushed down the OrderedMuxExchange.
+   *
+   * @throws Exception if anything goes wrong
+   */
+  @Test
+  public void testLimitOnOrderedMuxWindow() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .maxParallelization(1)
+            
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client.alterSession(ExecConstants.SLICE_TARGET, 10);
+      String sql = "SELECT emp_name, max(emp_id) over (order by emp_name) FROM 
dfs.`empTable` e order BY emp_name limit 10";
+      client.testBuilder()
+              .unOrdered()
+              .optionSettingQueriesForTestQuery("alter session set 
`planner.slice_target` = 10;")
+              .sqlQuery(sql)
+              .optionSettingQueriesForBaseline("alter session set 
`planner.enable_ordered_mux_exchange` = false") // Use default option setting.
+              .sqlBaselineQuery(sql)
+              .build()
+              .run();
+      client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true);
+      String explainText = client.queryBuilder().sql(sql).explainText();
+      assertTrue(explainText.matches(String.format(MATCH_PATTERN_ACROSS_LINES 
+ "%s"+
+              MATCH_PATTERN_ACROSS_LINES 
+"%s"+MATCH_PATTERN_ACROSS_LINES,ORDERED_MUX_EXCHANGE, TOPN)));
+    }
+  }
+}

Reply via email to