DRILL-133: LocalExchange planning and exec.

Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/59aae348
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/59aae348
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/59aae348

Branch: refs/heads/master
Commit: 59aae348478d242d9e08c46df609e94c5436e63f
Parents: 2a57136
Author: vkorukanti <venki.koruka...@gmail.com>
Authored: Thu Jan 22 18:04:37 2015 -0800
Committer: vkorukanti <venki.koruka...@gmail.com>
Committed: Sun Mar 8 22:49:18 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/hbase/BaseHBaseTest.java   |   6 +-
 .../apache/drill/exec/hive/HiveTestBase.java    |   2 +-
 .../drill/exec/physical/EndpointAffinity.java   |  70 +++-
 .../exec/physical/MinorFragmentEndpoint.java    |  63 ++++
 .../exec/physical/base/AbstractExchange.java    |  68 +++-
 .../exec/physical/base/AbstractGroupScan.java   |   8 +-
 .../exec/physical/base/AbstractReceiver.java    |  21 +-
 .../exec/physical/base/AbstractSender.java      |  24 +-
 .../drill/exec/physical/base/Exchange.java      |  31 +-
 .../physical/base/PhysicalOperatorUtil.java     |  23 ++
 .../drill/exec/physical/base/Receiver.java      |   9 +-
 .../apache/drill/exec/physical/base/Sender.java |   7 +-
 .../physical/config/AbstractDeMuxExchange.java  | 143 ++++++++
 .../physical/config/AbstractMuxExchange.java    | 125 +++++++
 .../exec/physical/config/BroadcastExchange.java |  28 +-
 .../exec/physical/config/BroadcastSender.java   |  13 +-
 .../physical/config/HashPartitionSender.java    |  15 +-
 .../physical/config/HashToMergeExchange.java    |  31 +-
 .../physical/config/HashToRandomExchange.java   |  31 +-
 .../physical/config/MergingReceiverPOP.java     |  20 +-
 .../config/OrderedPartitionExchange.java        |  30 +-
 .../physical/config/OrderedPartitionSender.java |  27 +-
 .../drill/exec/physical/config/RangeSender.java |  12 +-
 .../drill/exec/physical/config/Screen.java      |   2 +-
 .../physical/config/SingleMergeExchange.java    |  38 +-
 .../exec/physical/config/SingleSender.java      |  52 ++-
 .../exec/physical/config/UnionExchange.java     |  36 +-
 .../physical/config/UnorderedDeMuxExchange.java |  56 +++
 .../physical/config/UnorderedMuxExchange.java   |  55 +++
 .../exec/physical/config/UnorderedReceiver.java |  21 +-
 .../exec/physical/impl/SingleSenderCreator.java |  23 +-
 .../BroadcastSenderRootExec.java                |  36 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |   7 +-
 .../PartitionSenderRootExec.java                |  10 +-
 .../partitionsender/PartitionerTemplate.java    |  12 +-
 .../UnorderedReceiverBatch.java                 |   7 +-
 .../drill/exec/planner/fragment/Fragment.java   |  37 +-
 .../planner/fragment/MakeFragmentsVisitor.java  |  12 +-
 .../exec/planner/fragment/Materializer.java     |   4 +
 .../planner/fragment/ParallelizationInfo.java   | 139 +++++++
 .../exec/planner/fragment/PlanningSet.java      |  15 +-
 .../planner/fragment/SimpleParallelizer.java    | 298 ++++++++++++---
 .../drill/exec/planner/fragment/Stats.java      |  28 +-
 .../exec/planner/fragment/StatsCollector.java   | 144 ++++----
 .../drill/exec/planner/fragment/Wrapper.java    |  93 ++---
 .../exec/planner/physical/PlannerSettings.java  |   2 +
 .../physical/UnorderedDeMuxExchangePrel.java    |  66 ++++
 .../physical/UnorderedMuxExchangePrel.java      |  56 +++
 .../visitor/InsertLocalExchangeVisitor.java     |  90 +++++
 .../planner/sql/handlers/DefaultSqlHandler.java |  24 +-
 .../server/options/SystemOptionManager.java     |   2 +
 .../exec/store/dfs/easy/EasyGroupScan.java      |   1 -
 .../exec/store/direct/DirectGroupScan.java      |   5 -
 .../exec/store/ischema/InfoSchemaGroupScan.java |   5 -
 .../drill/exec/store/mock/MockGroupScanPOP.java |   5 -
 .../drill/exec/store/sys/SystemTableScan.java   |   6 -
 .../drill/exec/util/ArrayWrappedIntIntMap.java  |  65 ++++
 .../exec/work/batch/AbstractDataCollector.java  |  53 ++-
 .../drill/exec/work/batch/MergingCollector.java |   5 +-
 .../exec/work/batch/PartitionedCollector.java   |   4 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  12 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  75 +++-
 .../java/org/apache/drill/PlanTestBase.java     |   5 +-
 .../exec/compile/TestClassTransformation.java   |   4 +-
 .../exec/physical/impl/TestLocalExchange.java   | 358 +++++++++++++++++++
 .../apache/drill/exec/pop/PopUnitTestBase.java  |  13 +-
 .../drill/exec/pop/TestFragmentChecker.java     |  13 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   6 +-
 .../exec/util/TestArrayWrappedIntIntMap.java    |  83 +++++
 pom.xml                                         |   3 +
 70 files changed, 2236 insertions(+), 657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 1152b7b..b955d3b 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hbase.HBaseStoragePlugin;
 import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig;
 import org.apache.hadoop.conf.Configuration;
@@ -54,11 +55,12 @@ public class BaseHBaseTest extends BaseTestQuery {
     HBaseTestsSuite.configure(true, true);
     HBaseTestsSuite.initCluster();
 
-    storagePlugin = (HBaseStoragePlugin) 
bit.getContext().getStorage().getPlugin(HBASE_STORAGE_PLUGIN_NAME);
+    final StoragePluginRegistry pluginRegistry = 
getDrillbitContext().getStorage();
+    storagePlugin = (HBaseStoragePlugin) 
pluginRegistry.getPlugin(HBASE_STORAGE_PLUGIN_NAME);
     storagePluginConfig = storagePlugin.getConfig();
     storagePluginConfig.setEnabled(true);
     storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
-    bit.getContext().getStorage().createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, 
storagePluginConfig, true);
+    pluginRegistry.createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, 
storagePluginConfig, true);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
index 7e3b6c8..1c7e16d 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -31,7 +31,7 @@ public class HiveTestBase extends PlanTestBase {
 
   @BeforeClass
   public static void generateHive() throws Exception{
-    hiveTest = new HiveTestDataGenerator(bit.getContext().getStorage());
+    hiveTest = new HiveTestDataGenerator(getDrillbitContext().getStorage());
     hiveTest.createAndAddHiveTestPlugin();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
index df31f74..4d8e23f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -17,50 +17,86 @@
  */
 package org.apache.drill.exec.physical;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.protobuf.TextFormat;
 
-
-public class EndpointAffinity implements Comparable<EndpointAffinity>{
+/**
+ * EndpointAffinity captures affinity value for a given single Drillbit 
endpoint.
+ */
+public class EndpointAffinity {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
 
-  private DrillbitEndpoint endpoint;
-  private float affinity = 0.0f;
+  private final DrillbitEndpoint endpoint;
+  private double affinity = 0.0d;
 
+  /**
+   * Create EndpointAffinity instance for given Drillbit endpoint. Affinity is 
initialized to 0. Affinity can be added
+   * after EndpointAffinity object creation using {@link #addAffinity(double)}.
+   *
+   * @param endpoint Drillbit endpoint.
+   */
   public EndpointAffinity(DrillbitEndpoint endpoint) {
-    super();
     this.endpoint = endpoint;
   }
 
-  public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
-    super();
+  /**
+   * Create EndpointAffinity instance for given Drillbit endpoint and affinity 
initialized to given affinity value.
+   * Affinity can be added after EndpointAffinity object creation using {@link 
#addAffinity(double)}.
+   *
+   * @param endpoint Drillbit endpoint.
+   * @param affinity Initial affinity value.
+   */
+  public EndpointAffinity(DrillbitEndpoint endpoint, double affinity) {
     this.endpoint = endpoint;
     this.affinity = affinity;
   }
 
+  /**
+   * Return the Drillbit endpoint in this instance.
+   *
+   * @return Drillbit endpoint.
+   */
   public DrillbitEndpoint getEndpoint() {
     return endpoint;
   }
-  public void setEndpoint(DrillbitEndpoint endpoint) {
-    this.endpoint = endpoint;
-  }
-  public float getAffinity() {
+
+  /**
+   * Get the affinity value. Affinity value is Double.POSITIVE_INFINITY if the 
Drillbit endpoint requires an assignment.
+   *
+   * @return affinity value
+   */
+  public double getAffinity() {
     return affinity;
   }
 
-  @Override
-  public int compareTo(EndpointAffinity o) {
-    return Float.compare(affinity, o.affinity);
+  /**
+   * Add given affinity value to existing affinity value.
+   *
+   * @param f Affinity value (must be a non-negative value).
+   * @throws java.lang.IllegalArgumentException If the given affinity value is 
negative.
+   */
+  public void addAffinity(double f){
+    Preconditions.checkArgument(f >= 0.0d, "Affinity should not be negative");
+    if (Double.POSITIVE_INFINITY == f) {
+      affinity = f;
+    } else if (Double.POSITIVE_INFINITY != affinity) {
+      affinity += f;
+    }
   }
 
-  public void addAffinity(float f){
-    affinity += f;
+  /**
+   * Is this endpoint required to be in fragment endpoint assignment list?
+   *
+   * @return Returns true for mandatory assignment, false otherwise.
+   */
+  public boolean isAssignmentRequired() {
+    return Double.POSITIVE_INFINITY == affinity;
   }
 
   @Override
   public String toString() {
     return "EndpointAffinity [endpoint=" + 
TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java
new file mode 100644
index 0000000..c33836c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+/**
+ * MinorFragmentEndpoint represents fragment's MinorFragmentId and Drillbit 
endpoint to which the fragment is
+ * assigned for execution.
+ */
+@JsonTypeName("fragment-endpoint")
+public class MinorFragmentEndpoint {
+  private final int id;
+  private final DrillbitEndpoint endpoint;
+
+  @JsonCreator
+  public MinorFragmentEndpoint(@JsonProperty("minorFragmentId") int id, 
@JsonProperty("endpoint") DrillbitEndpoint endpoint) {
+    this.id = id;
+    this.endpoint = endpoint;
+  }
+
+  /**
+   * Get the minor fragment id.
+   * @return Minor fragment id.
+   */
+  @JsonProperty("minorFragmentId")
+  public int getId() {
+    return id;
+  }
+
+  /**
+   * Get the Drillbit endpoint where the fragment is assigned for execution.
+   *
+   * @return Drillbit endpoint.
+   */
+  @JsonProperty("endpoint")
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+  @Override
+  public String toString() {
+    return "FragmentEndPoint: id = " + id + ", ep = " + endpoint;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
index 73280ea..5fbe838 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -17,16 +17,26 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 public abstract class AbstractExchange extends AbstractSingle implements 
Exchange {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);
 
+  // Ephemeral info for generating execution fragments.
   protected int senderMajorFragmentId;
   protected int receiverMajorFragmentId;
+  protected List<DrillbitEndpoint> senderLocations;
+  protected List<DrillbitEndpoint> receiverLocations;
 
   public AbstractExchange(PhysicalOperator child) {
     super(child);
@@ -41,13 +51,58 @@ public abstract class AbstractExchange extends 
AbstractSingle implements Exchang
     return false;
   }
 
+  /**
+   * Default sender parallelization width range is [1, Integer.MAX_VALUE] and 
no endpoint affinity
+   * @param receiverFragmentEndpoints Endpoints assigned to receiver fragment 
if available, otherwise an empty list.
+   * @return
+   */
+  @Override
+  public ParallelizationInfo 
getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints) {
+    return ParallelizationInfo.UNLIMITED_WIDTH_NO_ENDPOINT_AFFINITY;
+  }
+
+  /**
+   * Default receiver parallelization width range is [1, Integer.MAX_VALUE] 
and affinity to nodes where sender
+   * fragments are running.
+   * @param senderFragmentEndpoints Endpoints assigned to receiver fragment if 
available, otherwise an empty list.
+   * @return
+   */
   @Override
-  public int getMaxReceiveWidth() {
-    return Integer.MAX_VALUE;
+  public ParallelizationInfo 
getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) {
+    Preconditions.checkArgument(senderFragmentEndpoints != null && 
senderFragmentEndpoints.size() > 0,
+        "Sender fragment endpoint list should not be empty");
+
+    return ParallelizationInfo.create(1, Integer.MAX_VALUE, 
getDefaultAffinityMap(senderFragmentEndpoints));
   }
 
-  protected abstract void setupSenders(List<DrillbitEndpoint> senderLocations) 
throws PhysicalOperatorSetupException ;
-  protected abstract void setupReceivers(List<DrillbitEndpoint> 
senderLocations) throws PhysicalOperatorSetupException ;
+  /**
+   * Get a default endpoint affinity map where affinity of a Drillbit is 
proportional to the number of its occurrances
+   * in given endpoint list.
+   *
+   * @param fragmentEndpoints Drillbit endpoint assignments of fragments.
+   * @return List of EndpointAffinity objects for each Drillbit endpoint given 
<i>fragmentEndpoints</i>.
+   */
+  protected static List<EndpointAffinity> 
getDefaultAffinityMap(List<DrillbitEndpoint> fragmentEndpoints) {
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
+    final double affinityPerOccurrence = 1.0d / fragmentEndpoints.size();
+    for(DrillbitEndpoint sender : fragmentEndpoints) {
+      if (affinityMap.containsKey(sender)) {
+        affinityMap.get(sender).addAffinity(affinityPerOccurrence);
+      } else {
+        affinityMap.put(sender, new EndpointAffinity(sender, 
affinityPerOccurrence));
+      }
+    }
+
+    return new ArrayList(affinityMap.values());
+  }
+
+  protected void setupSenders(List<DrillbitEndpoint> senderLocations) throws 
PhysicalOperatorSetupException {
+    this.senderLocations = ImmutableList.copyOf(senderLocations);
+  }
+
+  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) 
throws PhysicalOperatorSetupException {
+    this.receiverLocations = ImmutableList.copyOf(receiverLocations);
+  }
 
   @Override
   public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> 
senderLocations) throws PhysicalOperatorSetupException {
@@ -72,5 +127,8 @@ public abstract class AbstractExchange extends 
AbstractSingle implements Exchang
     throw new UnsupportedOperationException();
   }
 
-
+  @Override
+  public ParallelizationDependency getParallelizationDependency() {
+    return ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 5d0d9bf..276ecb5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -24,17 +25,22 @@ import org.apache.drill.common.expression.SchemaPath;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Iterators;
+import org.apache.drill.exec.physical.EndpointAffinity;
 
 public abstract class AbstractGroupScan extends AbstractBase implements 
GroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
 
-
   @Override
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.emptyIterator();
   }
 
   @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @Override
   public boolean isExecutable() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
index f621a26..f01d025 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
@@ -20,18 +20,28 @@ package org.apache.drill.exec.physical.base;
 import java.util.Iterator;
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 
 public abstract class AbstractReceiver extends AbstractBase implements 
Receiver{
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractReceiver.class);
 
   private final int oppositeMajorFragmentId;
+  private final List<MinorFragmentEndpoint> senders;
 
-  public AbstractReceiver(int oppositeMajorFragmentId){
+  /**
+   * @param oppositeMajorFragmentId MajorFragmentId of fragments that are 
sending data to this receiver.
+   * @param senders List of sender MinorFragmentEndpoints each containing 
sender MinorFragmentId and Drillbit endpoint
+   *                where it is running.
+   */
+  public AbstractReceiver(int oppositeMajorFragmentId, 
List<MinorFragmentEndpoint> senders){
     this.oppositeMajorFragmentId = oppositeMajorFragmentId;
+    this.senders = ImmutableList.copyOf(senders);
   }
 
   @Override
@@ -56,5 +66,14 @@ public abstract class AbstractReceiver extends AbstractBase 
implements Receiver{
     return oppositeMajorFragmentId;
   }
 
+  @JsonProperty("senders")
+  public List<MinorFragmentEndpoint> getProvidingEndpoints() {
+    return senders;
+  }
+
+  @JsonIgnore
+  public int getNumSenders() {
+    return senders.size();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
index 53a0721..2e42f26 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
@@ -18,16 +18,29 @@
 package org.apache.drill.exec.physical.base;
 
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 
+import java.util.List;
 
 public abstract class AbstractSender extends AbstractSingle implements Sender {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractSender.class);
 
   protected final int oppositeMajorFragmentId;
-
-  public AbstractSender(int oppositeMajorFragmentId, PhysicalOperator child) {
+  //
+  protected final List<MinorFragmentEndpoint> destinations;
+
+  /**
+   * @param oppositeMajorFragmentId MajorFragmentId of fragments that are 
receiving data sent by this sender.
+   * @param child Child PhysicalOperator which is providing data to this 
Sender.
+   * @param destinations List of receiver MinorFragmentEndpoints each 
containing MinorFragmentId and Drillbit endpoint
+   *                     where it is running.
+   */
+  public AbstractSender(int oppositeMajorFragmentId, PhysicalOperator child, 
List<MinorFragmentEndpoint> destinations) {
     super(child);
     this.oppositeMajorFragmentId = oppositeMajorFragmentId;
+    this.destinations = ImmutableList.copyOf(destinations);
   }
 
   @Override
@@ -36,9 +49,14 @@ public abstract class AbstractSender extends AbstractSingle 
implements Sender {
   }
 
   @Override
+  @JsonProperty("receiver-major-fragment")
   public int getOppositeMajorFragmentId() {
     return oppositeMajorFragmentId;
   }
 
-
+  @Override
+  @JsonProperty("destinations")
+  public List<MinorFragmentEndpoint> getDestinations() {
+    return destinations;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index 7be7f20..d8014f5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base;
 import java.util.List;
 
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -27,6 +28,19 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 public interface Exchange extends PhysicalOperator {
 
   /**
+   * Exchanges are fragment boundaries in physical operator tree. It is 
divided into two parts. First part is Sender
+   * which becomes part of the sending fragment. Second part is Receiver which 
becomes part of the fragment that
+   * receives the data.
+   *
+   * Assignment dependency describes whether sender fragments depend on 
receiver fragment's endpoint assignment for
+   * determining its parallelization and endpoint assignment and vice versa.
+   */
+  public enum ParallelizationDependency {
+    SENDER_DEPENDS_ON_RECEIVER, // Sending fragment depends on receiving 
fragment for parallelization
+    RECEIVER_DEPENDS_ON_SENDER, // Receiving fragment depends on sending 
fragment for parallelization (default value).
+  }
+
+  /**
    * Inform this Exchange node about its sender locations. This list should be 
index-ordered the same as the expected
    * minorFragmentIds for each sender.
    *
@@ -65,20 +79,24 @@ public interface Exchange extends PhysicalOperator {
   public abstract Receiver getReceiver(int minorFragmentId);
 
   /**
-   * The widest width this sender can send (max sending parallelization). 
Typically Integer.MAX_VALUE.
+   * Provide parallelization parameters for sender side of the exchange. 
Output includes min width,
+   * max width and affinity to Drillbits.
    *
+   * @param receiverFragmentEndpoints Endpoints assigned to receiver fragment 
if available, otherwise an empty list.
    * @return
    */
   @JsonIgnore
-  public abstract int getMaxSendWidth();
+  public abstract ParallelizationInfo 
getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints);
 
   /**
-   * The widest width this receiver can receive(max receive parallelization). 
Default is Integer.MAX_VALUE.
+   * Provide parallelization parameters for receiver side of the exchange. 
Output includes min width,
+   * max width and affinity to Drillbits.
    *
+   * @param senderFragmentEndpoints Endpoints assigned to receiver fragment if 
available, otherwise an empty list
    * @return
    */
   @JsonIgnore
-  public abstract int getMaxReceiveWidth();
+  public abstract ParallelizationInfo 
getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints);
 
   /**
    * Return the feeding child of this operator node.
@@ -87,4 +105,9 @@ public interface Exchange extends PhysicalOperator {
    */
   public PhysicalOperator getChild();
 
+  /**
+   * Get the parallelization dependency of the Exchange.
+   */
+  @JsonIgnore
+  public ParallelizationDependency getParallelizationDependency();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
index dfcb113..487ce77 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
@@ -17,9 +17,14 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.config.CommonConstants;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.List;
 
 public class PhysicalOperatorUtil {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class);
@@ -31,4 +36,22 @@ public class PhysicalOperatorUtil {
     logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) );
     return ops;
   }
+
+  /**
+   * Helper method to create a list of MinorFragmentEndpoint instances from a 
given endpoint assignment list.
+   *
+   * @param endpoints Assigned endpoint list. Index of each endpoint in list 
indicates the MinorFragmentId of the
+   *                  fragment that is assigned to the endpoint.
+   * @return
+   */
+  public static List<MinorFragmentEndpoint> 
getIndexOrderedEndpoints(List<DrillbitEndpoint> endpoints) {
+    List<MinorFragmentEndpoint> destinations = Lists.newArrayList();
+    int minorFragmentId = 0;
+    for(DrillbitEndpoint endpoint : endpoints) {
+      destinations.add(new MinorFragmentEndpoint(minorFragmentId, endpoint));
+      minorFragmentId++;
+    }
+
+    return destinations;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
index 0c67770..04d6d7e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 import java.util.List;
 
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,10 +31,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 public interface Receiver extends FragmentLeaf {
 
   /**
-   * A receiver is expecting streams from one or more providing endpoints.  
This method should return a list of the expected sending endpoints.
-   * @return List of counterpart sending DrillbitEndpoints.
+   * A receiver is expecting streams from one or more providing endpoints.
+   * @return List of sender MinorFragmentEndpoints each containing sender 
fragment MinorFragmentId and endpoint where
+   * it is running.
    */
-  public abstract List<DrillbitEndpoint> getProvidingEndpoints();
+  public abstract List<MinorFragmentEndpoint> getProvidingEndpoints();
 
   /**
    * Whether or not this receive supports out of order exchange. This provides 
a hint for the scheduling node on whether

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
index bbd1b2c..e4f2adf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 import java.util.List;
 
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -31,9 +31,10 @@ public interface Sender extends FragmentRoot {
 
   /**
    * Get the list of destination endpoints that this Sender will be 
communicating with.
-   * @return List of DrillbitEndpoints.
+   * @return List of receiver MinorFragmentEndpoints each containing receiver 
fragment MinorFragmentId and endpoint
+   * where it is running.
    */
-  public abstract List<DrillbitEndpoint> getDestinations();
+  public abstract List<MinorFragmentEndpoint> getDestinations();
 
   /**
    * Get the receiver major fragment id that is opposite this sender.

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java
new file mode 100644
index 0000000..352e84b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * DeMuxExchange is opposite of MuxExchange. It is used when the sender has 
overhead that is proportional to the
+ * number of receivers. DeMuxExchange is run one instance per Drillbit 
endpoint which collects and distributes data
+ * belonging to local receiving fragments running on the same Drillbit.
+ *
+ * Example:
+ * On a 3 node cluster, if the sender has 10 receivers on each node each 
sender requires 30 buffers. By inserting
+ * DeMuxExchange, we create one receiver per node which means total of 3 
receivers for each sender. If the number of
+ * senders is 10, we use 10*3 buffers instead of 10*30. DeMuxExchange has a 
overhead of buffer space that is equal to
+ * number of local receivers. In this case each DeMuxExchange needs 10 
buffers, so total of 3*10 buffers.
+ */
+public abstract class AbstractDeMuxExchange extends AbstractExchange {
+  protected final LogicalExpression expr;
+
+  // Ephemeral info used when creating execution fragments.
+  protected Map<Integer, MinorFragmentEndpoint> receiverToSenderMapping;
+  protected ArrayListMultimap<Integer, MinorFragmentEndpoint> 
senderToReceiversMapping;
+  private boolean isSenderReceiverMappingCreated;
+
+  public AbstractDeMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("expr") LogicalExpression expr) {
+    super(child);
+    this.expr = expr;
+  }
+
+  @JsonProperty("expr")
+  public LogicalExpression getExpression(){
+    return expr;
+  }
+
+  @Override
+  public ParallelizationInfo 
getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints) {
+    Preconditions.checkArgument(receiverFragmentEndpoints != null && 
receiverFragmentEndpoints.size() > 0,
+        "Receiver fragment endpoint list should not be empty");
+
+    // We want to run one demux sender per Drillbit endpoint.
+    // Identify the number of unique Drillbit endpoints in receiver fragment 
endpoints.
+    List<DrillbitEndpoint> drillbitEndpoints = 
ImmutableSet.copyOf(receiverFragmentEndpoints).asList();
+
+    List<EndpointAffinity> affinities = Lists.newArrayList();
+    for(DrillbitEndpoint ep : drillbitEndpoints) {
+      affinities.add(new EndpointAffinity(ep, Double.POSITIVE_INFINITY));
+    }
+
+    return ParallelizationInfo.create(affinities.size(), affinities.size(), 
affinities);
+  }
+
+  @Override
+  public ParallelizationInfo 
getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) {
+    return ParallelizationInfo.UNLIMITED_WIDTH_NO_ENDPOINT_AFFINITY;
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+    createSenderReceiverMapping();
+
+    List<MinorFragmentEndpoint> receivers = 
senderToReceiversMapping.get(minorFragmentId);
+    if (receivers == null || receivers.size() <= 0) {
+      throw new IllegalStateException(String.format("Failed to find receivers 
for sender [%d]", minorFragmentId));
+    }
+
+    return new HashPartitionSender(receiverMajorFragmentId, child, expr, 
receivers);
+  }
+
+  /**
+   * In DeMuxExchange, sender fragment parallelization and endpoint assignment 
depends on receiver fragment endpoint
+   * assignments.
+   */
+  @Override
+  public ParallelizationDependency getParallelizationDependency() {
+    return ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER;
+  }
+
+  protected void createSenderReceiverMapping() {
+    if (isSenderReceiverMappingCreated) {
+      return;
+    }
+
+    senderToReceiversMapping = ArrayListMultimap.create();
+    receiverToSenderMapping = Maps.newHashMap();
+
+    // Find the list of receiver fragment ids assigned to each Drillbit 
endpoint
+    ArrayListMultimap<DrillbitEndpoint, Integer> endpointReceiverList = 
ArrayListMultimap.create();
+
+    int receiverFragmentId = 0;
+    for(DrillbitEndpoint receiverLocation : receiverLocations) {
+      endpointReceiverList.put(receiverLocation, receiverFragmentId);
+      receiverFragmentId++;
+    }
+
+    int senderFragmentId = 0;
+    for(DrillbitEndpoint senderLocation : senderLocations) {
+      final List<Integer> receiverMinorFragmentIds = 
endpointReceiverList.get(senderLocation);
+
+      for(Integer receiverId : receiverMinorFragmentIds) {
+        receiverToSenderMapping.put(receiverId, new 
MinorFragmentEndpoint(senderFragmentId, senderLocation));
+
+        senderToReceiversMapping.put(senderFragmentId,
+            new MinorFragmentEndpoint(receiverId, 
receiverLocations.get(receiverId)));
+      }
+      senderFragmentId++;
+    }
+
+    isSenderReceiverMappingCreated = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
new file mode 100644
index 0000000..6715f96
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Multiplexing Exchange (MuxExchange) is used when results from multiple 
minor fragments belonging to the same
+ * major fragment running on a node need to be collected at one fragment on 
the same node before distributing the
+ * results further. This helps when the sender that is distributing the 
results has overhead that is proportional to
+ * the number of sender instances. An example of such sender is 
PartitionSender. Each instance of PartitionSender
+ * allocates "r" buffers where "r" is the number of receivers.
+ *
+ * Ex. Drillbit A is assigned 10 minor fragments belonging to the same major 
fragment. Each of these fragments
+ * has a PartitionSender instance which is sending data to 300 receivers. Each 
PartitionSender needs 300 buffers,
+ * so total of 10*300 buffers are needed. With MuxExchange, all 10 fragments 
send the data directly (without
+ * partitioning) to MuxExchange which uses the PartitionSender to partition 
the incoming data and distribute
+ * to receivers. MuxExchange has only one instance per Drillbit per major 
fragment which means only one instance of
+ * PartitionSender per Drillbit per major fragment. With MuxExchange total 
number of buffers used by PartitionSender
+ * for the 10 fragments is 300 instead of earlier number 10*300.
+ */
+public abstract class AbstractMuxExchange extends AbstractExchange {
+
+  // Ephemeral info used when creating execution fragments.
+  protected Map<Integer, MinorFragmentEndpoint> senderToReceiverMapping;
+  protected ArrayListMultimap<Integer, MinorFragmentEndpoint> 
receiverToSenderMapping;
+  private boolean isSenderReceiverMappingCreated;
+
+  public AbstractMuxExchange(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  public ParallelizationInfo 
getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) {
+    Preconditions.checkArgument(senderFragmentEndpoints != null && 
senderFragmentEndpoints.size() > 0,
+        "Sender fragment endpoint list should not be empty");
+
+    // We want to run one mux receiver per Drillbit endpoint.
+    // Identify the number of unique Drillbit endpoints in sender fragment 
endpoints.
+    List<DrillbitEndpoint> drillbitEndpoints = 
ImmutableSet.copyOf(senderFragmentEndpoints).asList();
+
+    List<EndpointAffinity> affinities = Lists.newArrayList();
+    for(DrillbitEndpoint ep : drillbitEndpoints) {
+      affinities.add(new EndpointAffinity(ep, Double.POSITIVE_INFINITY));
+    }
+
+    return ParallelizationInfo.create(affinities.size(), affinities.size(), 
affinities);
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+    createSenderReceiverMapping();
+
+    MinorFragmentEndpoint receiver = 
senderToReceiverMapping.get(minorFragmentId);
+    if (receiver == null) {
+      throw new IllegalStateException(String.format("Failed to find receiver 
for sender [%d]", minorFragmentId));
+    }
+
+    return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, 
receiver.getEndpoint());
+  }
+
+  protected void createSenderReceiverMapping() {
+    if (isSenderReceiverMappingCreated) {
+      return;
+    }
+
+    senderToReceiverMapping = Maps.newHashMap();
+    receiverToSenderMapping = ArrayListMultimap.create();
+
+    // Find the list of sender fragment ids assigned to each Drillbit endpoint.
+    ArrayListMultimap<DrillbitEndpoint, Integer> endpointSenderList = 
ArrayListMultimap.create();
+
+    int senderFragmentId = 0;
+    for(DrillbitEndpoint senderLocation : senderLocations) {
+      endpointSenderList.put(senderLocation, senderFragmentId);
+      senderFragmentId++;
+    }
+
+    int receiverFragmentId = 0;
+    for(DrillbitEndpoint receiverLocation : receiverLocations) {
+      List<Integer> senderFragmentIds = 
endpointSenderList.get(receiverLocation);
+
+      for(Integer senderId : senderFragmentIds) {
+        senderToReceiverMapping.put(senderId, new 
MinorFragmentEndpoint(receiverFragmentId, receiverLocation));
+
+        receiverToSenderMapping.put(receiverFragmentId,
+            new MinorFragmentEndpoint(senderId, 
senderLocations.get(senderId)));
+      }
+      receiverFragmentId++;
+    }
+
+    isSenderReceiverMappingCreated = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
index 73a1d20..a37f638 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -17,15 +17,12 @@
  
******************************************************************************/
 package org.apache.drill.exec.physical.config;
 
-import java.util.List;
-
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -34,41 +31,24 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("broadcast-exchange")
 public class BroadcastExchange extends AbstractExchange {
 
-  private List<DrillbitEndpoint> senderLocations;
-  private List<DrillbitEndpoint> receiverLocations;
-
   @JsonCreator
   public BroadcastExchange(@JsonProperty("child") PhysicalOperator child) {
     super(child);
   }
 
   @Override
-  protected void setupSenders(List<DrillbitEndpoint> senderLocations) throws 
PhysicalOperatorSetupException {
-    this.senderLocations = senderLocations;
-  }
-
-  @Override
-  protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> 
receiverLocations) throws PhysicalOperatorSetupException {
-    this.receiverLocations = receiverLocations;
-  }
-
-  @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
     return new BroadcastExchange(child);
   }
 
   @Override
   public Sender getSender(int minorFragmentId, PhysicalOperator child) throws 
PhysicalOperatorSetupException {
-    return new BroadcastSender(receiverMajorFragmentId, child, 
receiverLocations);
+    return new BroadcastSender(receiverMajorFragmentId, child,
+        PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations));
   }
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, senderLocations);
-  }
-
-  @Override
-  public int getMaxSendWidth() {
-    return Integer.MAX_VALUE;
+    return new UnorderedReceiver(senderMajorFragmentId, 
PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
index 1827367..7ddfa95 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.List;
 
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -33,14 +33,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("broadcast-sender")
 public class BroadcastSender extends AbstractSender {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BroadcastSender.class);
-  private final List<DrillbitEndpoint> destinations;
 
   @JsonCreator
   public BroadcastSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
                          @JsonProperty("child") PhysicalOperator child,
-                         @JsonProperty("destinations") List<DrillbitEndpoint> 
destinations) {
-    super(oppositeMajorFragmentId, child);
-    this.destinations = destinations;
+                         @JsonProperty("destinations") 
List<MinorFragmentEndpoint> destinations) {
+    super(oppositeMajorFragmentId, child, destinations);
   }
 
   @Override
@@ -49,11 +47,6 @@ public class BroadcastSender extends AbstractSender {
   }
 
   @Override
-  public List<DrillbitEndpoint> getDestinations() {
-    return destinations;
-  }
-
-  @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
     return physicalVisitor.visitBroadcastSender(this, value);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
index bdb1362..52db80d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.physical.config;
 import java.util.List;
 
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,27 +34,20 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class HashPartitionSender extends AbstractSender {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class);
 
-  private final List<DrillbitEndpoint> endpoints;
   private final LogicalExpression expr;
 
   @JsonCreator
   public HashPartitionSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
                              @JsonProperty("child") PhysicalOperator child,
                              @JsonProperty("expr") LogicalExpression expr,
-                             @JsonProperty("destinations") 
List<DrillbitEndpoint> endpoints) {
-    super(oppositeMajorFragmentId, child);
+                             @JsonProperty("destinations") 
List<MinorFragmentEndpoint> endpoints) {
+    super(oppositeMajorFragmentId, child, endpoints);
     this.expr = expr;
-    this.endpoints = endpoints;
-  }
-
-  @Override
-  public List<DrillbitEndpoint> getDestinations() {
-    return endpoints;
   }
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new HashPartitionSender(oppositeMajorFragmentId, child, expr, 
endpoints);
+    return new HashPartitionSender(oppositeMajorFragmentId, child, expr, 
destinations);
   }
 
   public LogicalExpression getExpr() {

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
index f62d922..f45ace9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -23,9 +23,9 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -35,14 +35,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class HashToMergeExchange extends AbstractExchange{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class);
 
-
   private final LogicalExpression distExpr;
   private final List<Ordering> orderExprs;
 
-  //ephemeral for setup tasks.
-  private List<DrillbitEndpoint> senderLocations;
-  private List<DrillbitEndpoint> receiverLocations;
-
   @JsonCreator
   public HashToMergeExchange(@JsonProperty("child") PhysicalOperator child,
       @JsonProperty("expr") LogicalExpression expr,
@@ -53,29 +48,14 @@ public class HashToMergeExchange extends AbstractExchange{
   }
 
   @Override
-  public int getMaxSendWidth() {
-    return Integer.MAX_VALUE;
-  }
-
-
-  @Override
-  protected void setupSenders(List<DrillbitEndpoint> senderLocations) {
-    this.senderLocations = senderLocations;
-  }
-
-  @Override
-  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
-    this.receiverLocations = receiverLocations;
-  }
-
-  @Override
   public Sender getSender(int minorFragmentId, PhysicalOperator child) {
-    return new HashPartitionSender(receiverMajorFragmentId, child, distExpr, 
receiverLocations);
+    return new HashPartitionSender(receiverMajorFragmentId, child, distExpr,
+        PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations));
   }
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new MergingReceiverPOP(senderMajorFragmentId, senderLocations, 
orderExprs);
+    return new MergingReceiverPOP(senderMajorFragmentId, 
PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExprs);
   }
 
   @Override
@@ -87,7 +67,4 @@ public class HashToMergeExchange extends AbstractExchange{
   public List<Ordering> getOrderExpressions(){
     return orderExprs;
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
index fac374b..52d79c2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -22,9 +22,9 @@ import java.util.List;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -34,13 +34,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class HashToRandomExchange extends AbstractExchange{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashToRandomExchange.class);
 
-
   private final LogicalExpression expr;
 
-  //ephemeral for setup tasks.
-  private List<DrillbitEndpoint> senderLocations;
-  private List<DrillbitEndpoint> receiverLocations;
-
   @JsonCreator
   public HashToRandomExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("expr") LogicalExpression expr) {
     super(child);
@@ -48,29 +43,14 @@ public class HashToRandomExchange extends AbstractExchange{
   }
 
   @Override
-  public int getMaxSendWidth() {
-    return Integer.MAX_VALUE;
-  }
-
-
-  @Override
-  protected void setupSenders(List<DrillbitEndpoint> senderLocations) {
-    this.senderLocations = senderLocations;
-  }
-
-  @Override
-  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
-    this.receiverLocations = receiverLocations;
-  }
-
-  @Override
   public Sender getSender(int minorFragmentId, PhysicalOperator child) {
-    return new HashPartitionSender(receiverMajorFragmentId, child, expr, 
receiverLocations);
+    return new HashPartitionSender(receiverMajorFragmentId, child, expr,
+        PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations));
   }
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, senderLocations);
+    return new UnorderedReceiver(senderMajorFragmentId, 
PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
   }
 
   @Override
@@ -82,7 +62,4 @@ public class HashToRandomExchange extends AbstractExchange{
   public LogicalExpression getExpression(){
     return expr;
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index f5dca1a..9416814 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -20,13 +20,12 @@ package org.apache.drill.exec.physical.config;
 import java.util.List;
 
 import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractReceiver;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
@@ -38,25 +37,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class MergingReceiverPOP extends AbstractReceiver{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MergingReceiverPOP.class);
 
-  private final List<DrillbitEndpoint> senders;
   private final List<Ordering> orderings;
 
   @JsonCreator
   public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int 
oppositeMajorFragmentId,
-                            @JsonProperty("senders") List<DrillbitEndpoint> 
senders,
+                            @JsonProperty("senders") 
List<MinorFragmentEndpoint> senders,
                             @JsonProperty("orderings") List<Ordering> 
orderings) {
-    super(oppositeMajorFragmentId);
-    this.senders = senders;
+    super(oppositeMajorFragmentId, senders);
     this.orderings = orderings;
   }
 
   @Override
-  @JsonProperty("senders")
-  public List<DrillbitEndpoint> getProvidingEndpoints() {
-    return senders;
-  }
-
-  @Override
   public boolean supportsOutOfOrderExchange() {
     return false;
   }
@@ -74,9 +65,4 @@ public class MergingReceiverPOP extends AbstractReceiver{
   public int getOperatorType() {
     return CoreOperatorType.MERGING_RECEIVER_VALUE;
   }
-
-  @JsonIgnore
-  public int getNumSenders() {
-    return senders.size();
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index 8e1526a..c8dbc22 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -23,9 +23,9 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -36,17 +36,12 @@ import com.google.common.base.Preconditions;
 public class OrderedPartitionExchange extends AbstractExchange {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedPartitionExchange.class);
 
-
   private final List<Ordering> orderings;
   private final FieldReference ref;
   private int recordsToSample = 10000; // How many records must be received 
before analyzing
   private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of 
partitions to send to distributed cache
   private float completionFactor = .75f; // What fraction of fragments must be 
completed before attempting to build partition table
 
-  //ephemeral for setup tasks.
-  private List<DrillbitEndpoint> senderLocations;
-  private List<DrillbitEndpoint> receiverLocations;
-
   @JsonCreator
   public OrderedPartitionExchange(@JsonProperty("orderings") List<Ordering> 
orderings, @JsonProperty("ref") FieldReference ref,
                                   @JsonProperty("child") PhysicalOperator 
child, @JsonProperty("recordsToSample") Integer recordsToSample,
@@ -70,30 +65,15 @@ public class OrderedPartitionExchange extends 
AbstractExchange {
   }
 
   @Override
-  public int getMaxSendWidth() {
-    return Integer.MAX_VALUE;
-  }
-
-
-  @Override
-  protected void setupSenders(List<DrillbitEndpoint> senderLocations) {
-    this.senderLocations = senderLocations;
-  }
-
-  @Override
-  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
-    this.receiverLocations = receiverLocations;
-  }
-
-  @Override
   public Sender getSender(int minorFragmentId, PhysicalOperator child) {
-    return new OrderedPartitionSender(orderings, ref, child, 
receiverLocations, receiverMajorFragmentId, senderLocations.size(), 
recordsToSample,
-            samplingFactor, completionFactor);
+    return new OrderedPartitionSender(orderings, ref, child,
+        PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations),
+        receiverMajorFragmentId, senderLocations.size(), recordsToSample, 
samplingFactor, completionFactor);
   }
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, senderLocations);
+    return new UnorderedReceiver(senderMajorFragmentId, 
PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index 0a2b9be..2c9aeaf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -21,10 +21,10 @@ import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -38,7 +38,6 @@ public class OrderedPartitionSender extends AbstractSender {
 
   private final List<Ordering> orderings;
   private final FieldReference ref;
-  private final List<DrillbitEndpoint> endpoints;
   private final int sendingWidth;
 
   private int recordsToSample;
@@ -46,18 +45,22 @@ public class OrderedPartitionSender extends AbstractSender {
   private float completionFactor;
 
   @JsonCreator
-  public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> 
orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") 
PhysicalOperator child,
-                                @JsonProperty("destinations") 
List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
-                                @JsonProperty("sending-fragment-width") int 
sendingWidth, @JsonProperty("recordsToSample") int recordsToSample,
-                                @JsonProperty("samplingFactor") int 
samplingFactor, @JsonProperty("completionFactor") float completionFactor) {
-    super(oppositeMajorFragmentId, child);
+  public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> 
orderings,
+                                @JsonProperty("ref") FieldReference ref,
+                                @JsonProperty("child") PhysicalOperator child,
+                                @JsonProperty("destinations") 
List<MinorFragmentEndpoint> endpoints,
+                                @JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
+                                @JsonProperty("sending-fragment-width") int 
sendingWidth,
+                                @JsonProperty("recordsToSample") int 
recordsToSample,
+                                @JsonProperty("samplingFactor") int 
samplingFactor,
+                                @JsonProperty("completionFactor") float 
completionFactor) {
+    super(oppositeMajorFragmentId, child, endpoints);
     if (orderings == null) {
       this.orderings = Lists.newArrayList();
     } else {
       this.orderings = orderings;
     }
     this.ref = ref;
-    this.endpoints = endpoints;
     this.sendingWidth = sendingWidth;
     this.recordsToSample = recordsToSample;
     this.samplingFactor = samplingFactor;
@@ -68,10 +71,6 @@ public class OrderedPartitionSender extends AbstractSender {
     return sendingWidth;
   }
 
-  public List<DrillbitEndpoint> getDestinations() {
-    return endpoints;
-  }
-
   public List<Ordering> getOrderings() {
     return orderings;
   }
@@ -87,8 +86,8 @@ public class OrderedPartitionSender extends AbstractSender {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new OrderedPartitionSender(orderings, ref, child, endpoints, 
oppositeMajorFragmentId, sendingWidth, recordsToSample, samplingFactor,
-            completionFactor);
+    return new OrderedPartitionSender(orderings, ref, child, destinations, 
oppositeMajorFragmentId,
+        sendingWidth, recordsToSample, samplingFactor, completionFactor);
   }
 
   public int getRecordsToSample() {

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
index c8c8f43..e53fc0e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.physical.config;
 
+import java.util.Collections;
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -37,22 +40,15 @@ public class RangeSender extends AbstractSender{
 
   @JsonCreator
   public RangeSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, 
@JsonProperty("partitions") List<EndpointPartition> partitions) {
-    super(oppositeMajorFragmentId, child);
+    super(oppositeMajorFragmentId, child, 
Collections.<MinorFragmentEndpoint>emptyList());
     this.partitions = partitions;
   }
 
   @Override
-  public List<DrillbitEndpoint> getDestinations() {
-    return null;
-  }
-
-
-  @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
     return new RangeSender(oppositeMajorFragmentId, child, partitions);
   }
 
-
   public static class EndpointPartition{
     private final PartitionRange range;
     private final DrillbitEndpoint endpoint;

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 58c8e29..97c2405 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -48,7 +48,7 @@ public class Screen extends AbstractStore {
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.singletonList(new EndpointAffinity(endpoint, 
1000000000000l));
+    return Collections.singletonList(new EndpointAffinity(endpoint, 
Double.POSITIVE_INFINITY));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index 2914112..c812325 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -20,13 +20,16 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
-import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -38,10 +41,6 @@ public class SingleMergeExchange extends AbstractExchange {
 
   private final List<Ordering> orderExpr;
 
-  // ephemeral for setup tasks
-  private List<CoordinationProtos.DrillbitEndpoint> senderLocations;
-  private CoordinationProtos.DrillbitEndpoint receiverLocation;
-
   @JsonCreator
   public SingleMergeExchange(@JsonProperty("child") PhysicalOperator child,
                              @JsonProperty("orderings") List<Ordering> 
orderExpr) {
@@ -50,39 +49,30 @@ public class SingleMergeExchange extends AbstractExchange {
   }
 
   @Override
-  public int getMaxSendWidth() {
-    return Integer.MAX_VALUE;
-  }
+  public ParallelizationInfo 
getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) {
+    Preconditions.checkArgument(senderFragmentEndpoints != null && 
senderFragmentEndpoints.size() > 0,
+        "Sender fragment endpoint list should not be empty");
 
-  @Override
-  public int getMaxReceiveWidth() {
-    return 1;
+    return ParallelizationInfo.create(1, 1, 
getDefaultAffinityMap(senderFragmentEndpoints));
   }
 
   @Override
-  protected void setupSenders(List<CoordinationProtos.DrillbitEndpoint> 
senderLocations) {
-    this.senderLocations = senderLocations;
-  }
-
-  @Override
-  protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> 
receiverLocations)
+  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations)
       throws PhysicalOperatorSetupException {
+    Preconditions.checkArgument(receiverLocations.size() == 1,
+      "SingleMergeExchange only supports a single receiver endpoint.");
 
-    if (receiverLocations.size() != 1) {
-      throw new PhysicalOperatorSetupException("SingleMergeExchange only 
supports a single receiver endpoint");
-    }
-    receiverLocation = receiverLocations.iterator().next();
-
+    super.setupReceivers(receiverLocations);
   }
 
   @Override
   public Sender getSender(int minorFragmentId, PhysicalOperator child) {
-    return new SingleSender(receiverMajorFragmentId, child, receiverLocation);
+    return new SingleSender(receiverMajorFragmentId, child, 
receiverLocations.iterator().next());
   }
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new MergingReceiverPOP(senderMajorFragmentId, senderLocations, 
orderExpr);
+    return new MergingReceiverPOP(senderMajorFragmentId, 
PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExpr);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
index 4a11a51..9745ae7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
@@ -19,7 +19,10 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -38,23 +41,45 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class SingleSender extends AbstractSender {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SingleSender.class);
 
-  private final DrillbitEndpoint destination;
-
+  /**
+   * Create a SingleSender which sends data to fragment identified by given 
MajorFragmentId and MinorFragmentId,
+   * and running at given endpoint
+   *
+   * @param oppositeMajorFragmentId MajorFragmentId of the receiver fragment.
+   * @param oppositeMinorFragmentId MinorFragmentId of the receiver fragment.
+   * @param child Child operator
+   * @param destination Drillbit endpoint where the receiver fragment is 
running.
+   */
   @JsonCreator
-  public SingleSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, 
@JsonProperty("destination") DrillbitEndpoint destination) {
-    super(oppositeMajorFragmentId, child);
-    this.destination = destination;
+  public SingleSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
+                      @JsonProperty("receiver-minor-fragment") int 
oppositeMinorFragmentId,
+                      @JsonProperty("child") PhysicalOperator child,
+                      @JsonProperty("destination") DrillbitEndpoint 
destination) {
+    super(oppositeMajorFragmentId, child,
+        Collections.singletonList(new 
MinorFragmentEndpoint(oppositeMinorFragmentId, destination)));
+  }
+
+  /**
+   * Create a SingleSender which sends data to fragment with MinorFragmentId 
as <i>0</i> in given opposite major
+   * fragment.
+   *
+   * @param oppositeMajorFragmentId MajorFragmentId of the receiver fragment.
+   * @param child Child operator
+   * @param destination Drillbit endpoint where the receiver fragment is 
running.
+   */
+  public SingleSender(int oppositeMajorFragmentId, PhysicalOperator child, 
DrillbitEndpoint destination) {
+    this(oppositeMajorFragmentId, 0 /* default opposite minor fragment id*/, 
child, destination);
   }
 
   @Override
-  @JsonIgnore
-  public List<DrillbitEndpoint> getDestinations() {
-    return Collections.singletonList(destination);
+  @JsonIgnore // Destination endpoint is exported via getDestination() and 
getOppositeMinorFragmentId()
+  public List<MinorFragmentEndpoint> getDestinations() {
+    return destinations;
   }
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new SingleSender(oppositeMajorFragmentId, child, destination);
+    return new SingleSender(oppositeMajorFragmentId, 
getOppositeMinorFragmentId(), child, getDestination());
   }
 
   @Override
@@ -62,9 +87,14 @@ public class SingleSender extends AbstractSender {
     return physicalVisitor.visitSingleSender(this, value);
   }
 
-
+  @JsonProperty("destination")
   public DrillbitEndpoint getDestination() {
-    return destination;
+    return getDestinations().get(0).getEndpoint();
+  }
+
+  @JsonProperty("receiver-minor-fragment")
+  public int getOppositeMinorFragmentId() {
+    return getDestinations().get(0).getId();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index cfc21ac..b7b7835 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractExchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -34,44 +37,39 @@ public class UnionExchange extends AbstractExchange{
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionExchange.class);
 
-  private List<DrillbitEndpoint> senderLocations;
-  private DrillbitEndpoint destinationLocation;
-
   public UnionExchange(@JsonProperty("child") PhysicalOperator child) {
     super(child);
   }
 
   @Override
+  public ParallelizationInfo 
getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) {
+    Preconditions.checkArgument(senderFragmentEndpoints != null && 
senderFragmentEndpoints.size() > 0,
+        "Sender fragment endpoint list should not be empty");
+
+    return ParallelizationInfo.create(1, 1, 
getDefaultAffinityMap(senderFragmentEndpoints));
+  }
+
+  @Override
   public void setupSenders(List<DrillbitEndpoint> senderLocations) {
     this.senderLocations = senderLocations;
   }
 
   @Override
   protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) 
throws PhysicalOperatorSetupException {
-    if (receiverLocations.size() != 1) {
-      throw new PhysicalOperatorSetupException("A Union Exchange only supports 
a single receiver endpoint.");
-    }
-    this.destinationLocation = receiverLocations.iterator().next();
+    Preconditions.checkArgument(receiverLocations.size() == 1,
+        "Union Exchange only supports a single receiver endpoint.");
+
+    super.setupReceivers(receiverLocations);
   }
 
   @Override
   public Sender getSender(int minorFragmentId, PhysicalOperator child) {
-    return new SingleSender(this.receiverMajorFragmentId, child, 
destinationLocation);
+    return new SingleSender(receiverMajorFragmentId, child, 
receiverLocations.get(0));
   }
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(this.senderMajorFragmentId, senderLocations);
-  }
-
-  @Override
-  public int getMaxSendWidth() {
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public int getMaxReceiveWidth() {
-    return 1;
+    return new UnorderedReceiver(senderMajorFragmentId, 
PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
   }
 
   @Override

Reply via email to