DRILL-133: LocalExchange planning and exec.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/59aae348 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/59aae348 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/59aae348 Branch: refs/heads/master Commit: 59aae348478d242d9e08c46df609e94c5436e63f Parents: 2a57136 Author: vkorukanti <venki.koruka...@gmail.com> Authored: Thu Jan 22 18:04:37 2015 -0800 Committer: vkorukanti <venki.koruka...@gmail.com> Committed: Sun Mar 8 22:49:18 2015 -0700 ---------------------------------------------------------------------- .../org/apache/drill/hbase/BaseHBaseTest.java | 6 +- .../apache/drill/exec/hive/HiveTestBase.java | 2 +- .../drill/exec/physical/EndpointAffinity.java | 70 +++- .../exec/physical/MinorFragmentEndpoint.java | 63 ++++ .../exec/physical/base/AbstractExchange.java | 68 +++- .../exec/physical/base/AbstractGroupScan.java | 8 +- .../exec/physical/base/AbstractReceiver.java | 21 +- .../exec/physical/base/AbstractSender.java | 24 +- .../drill/exec/physical/base/Exchange.java | 31 +- .../physical/base/PhysicalOperatorUtil.java | 23 ++ .../drill/exec/physical/base/Receiver.java | 9 +- .../apache/drill/exec/physical/base/Sender.java | 7 +- .../physical/config/AbstractDeMuxExchange.java | 143 ++++++++ .../physical/config/AbstractMuxExchange.java | 125 +++++++ .../exec/physical/config/BroadcastExchange.java | 28 +- .../exec/physical/config/BroadcastSender.java | 13 +- .../physical/config/HashPartitionSender.java | 15 +- .../physical/config/HashToMergeExchange.java | 31 +- .../physical/config/HashToRandomExchange.java | 31 +- .../physical/config/MergingReceiverPOP.java | 20 +- .../config/OrderedPartitionExchange.java | 30 +- .../physical/config/OrderedPartitionSender.java | 27 +- .../drill/exec/physical/config/RangeSender.java | 12 +- .../drill/exec/physical/config/Screen.java | 2 +- .../physical/config/SingleMergeExchange.java | 38 +- .../exec/physical/config/SingleSender.java | 52 ++- .../exec/physical/config/UnionExchange.java | 36 +- .../physical/config/UnorderedDeMuxExchange.java | 56 +++ .../physical/config/UnorderedMuxExchange.java | 55 +++ .../exec/physical/config/UnorderedReceiver.java | 21 +- .../exec/physical/impl/SingleSenderCreator.java | 23 +- .../BroadcastSenderRootExec.java | 36 +- .../impl/mergereceiver/MergingRecordBatch.java | 7 +- .../PartitionSenderRootExec.java | 10 +- .../partitionsender/PartitionerTemplate.java | 12 +- .../UnorderedReceiverBatch.java | 7 +- .../drill/exec/planner/fragment/Fragment.java | 37 +- .../planner/fragment/MakeFragmentsVisitor.java | 12 +- .../exec/planner/fragment/Materializer.java | 4 + .../planner/fragment/ParallelizationInfo.java | 139 +++++++ .../exec/planner/fragment/PlanningSet.java | 15 +- .../planner/fragment/SimpleParallelizer.java | 298 ++++++++++++--- .../drill/exec/planner/fragment/Stats.java | 28 +- .../exec/planner/fragment/StatsCollector.java | 144 ++++---- .../drill/exec/planner/fragment/Wrapper.java | 93 ++--- .../exec/planner/physical/PlannerSettings.java | 2 + .../physical/UnorderedDeMuxExchangePrel.java | 66 ++++ .../physical/UnorderedMuxExchangePrel.java | 56 +++ .../visitor/InsertLocalExchangeVisitor.java | 90 +++++ .../planner/sql/handlers/DefaultSqlHandler.java | 24 +- .../server/options/SystemOptionManager.java | 2 + .../exec/store/dfs/easy/EasyGroupScan.java | 1 - .../exec/store/direct/DirectGroupScan.java | 5 - .../exec/store/ischema/InfoSchemaGroupScan.java | 5 - .../drill/exec/store/mock/MockGroupScanPOP.java | 5 - .../drill/exec/store/sys/SystemTableScan.java | 6 - .../drill/exec/util/ArrayWrappedIntIntMap.java | 65 ++++ .../exec/work/batch/AbstractDataCollector.java | 53 ++- .../drill/exec/work/batch/MergingCollector.java | 5 +- .../exec/work/batch/PartitionedCollector.java | 4 +- .../apache/drill/exec/work/foreman/Foreman.java | 12 +- .../java/org/apache/drill/BaseTestQuery.java | 75 +++- .../java/org/apache/drill/PlanTestBase.java | 5 +- .../exec/compile/TestClassTransformation.java | 4 +- .../exec/physical/impl/TestLocalExchange.java | 358 +++++++++++++++++++ .../apache/drill/exec/pop/PopUnitTestBase.java | 13 +- .../drill/exec/pop/TestFragmentChecker.java | 13 +- .../store/parquet/ParquetRecordReaderTest.java | 6 +- .../exec/util/TestArrayWrappedIntIntMap.java | 83 +++++ pom.xml | 3 + 70 files changed, 2236 insertions(+), 657 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index 1152b7b..b955d3b 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -24,6 +24,7 @@ import org.apache.drill.BaseTestQuery; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hbase.HBaseStoragePlugin; import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig; import org.apache.hadoop.conf.Configuration; @@ -54,11 +55,12 @@ public class BaseHBaseTest extends BaseTestQuery { HBaseTestsSuite.configure(true, true); HBaseTestsSuite.initCluster(); - storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin(HBASE_STORAGE_PLUGIN_NAME); + final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); + storagePlugin = (HBaseStoragePlugin) pluginRegistry.getPlugin(HBASE_STORAGE_PLUGIN_NAME); storagePluginConfig = storagePlugin.getConfig(); storagePluginConfig.setEnabled(true); storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort()); - bit.getContext().getStorage().createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, storagePluginConfig, true); + pluginRegistry.createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, storagePluginConfig, true); } @AfterClass http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java index 7e3b6c8..1c7e16d 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java @@ -31,7 +31,7 @@ public class HiveTestBase extends PlanTestBase { @BeforeClass public static void generateHive() throws Exception{ - hiveTest = new HiveTestDataGenerator(bit.getContext().getStorage()); + hiveTest = new HiveTestDataGenerator(getDrillbitContext().getStorage()); hiveTest.createAndAddHiveTestPlugin(); } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java index df31f74..4d8e23f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java @@ -17,50 +17,86 @@ */ package org.apache.drill.exec.physical; +import com.google.common.base.Preconditions; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.protobuf.TextFormat; - -public class EndpointAffinity implements Comparable<EndpointAffinity>{ +/** + * EndpointAffinity captures affinity value for a given single Drillbit endpoint. + */ +public class EndpointAffinity { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class); - private DrillbitEndpoint endpoint; - private float affinity = 0.0f; + private final DrillbitEndpoint endpoint; + private double affinity = 0.0d; + /** + * Create EndpointAffinity instance for given Drillbit endpoint. Affinity is initialized to 0. Affinity can be added + * after EndpointAffinity object creation using {@link #addAffinity(double)}. + * + * @param endpoint Drillbit endpoint. + */ public EndpointAffinity(DrillbitEndpoint endpoint) { - super(); this.endpoint = endpoint; } - public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) { - super(); + /** + * Create EndpointAffinity instance for given Drillbit endpoint and affinity initialized to given affinity value. + * Affinity can be added after EndpointAffinity object creation using {@link #addAffinity(double)}. + * + * @param endpoint Drillbit endpoint. + * @param affinity Initial affinity value. + */ + public EndpointAffinity(DrillbitEndpoint endpoint, double affinity) { this.endpoint = endpoint; this.affinity = affinity; } + /** + * Return the Drillbit endpoint in this instance. + * + * @return Drillbit endpoint. + */ public DrillbitEndpoint getEndpoint() { return endpoint; } - public void setEndpoint(DrillbitEndpoint endpoint) { - this.endpoint = endpoint; - } - public float getAffinity() { + + /** + * Get the affinity value. Affinity value is Double.POSITIVE_INFINITY if the Drillbit endpoint requires an assignment. + * + * @return affinity value + */ + public double getAffinity() { return affinity; } - @Override - public int compareTo(EndpointAffinity o) { - return Float.compare(affinity, o.affinity); + /** + * Add given affinity value to existing affinity value. + * + * @param f Affinity value (must be a non-negative value). + * @throws java.lang.IllegalArgumentException If the given affinity value is negative. + */ + public void addAffinity(double f){ + Preconditions.checkArgument(f >= 0.0d, "Affinity should not be negative"); + if (Double.POSITIVE_INFINITY == f) { + affinity = f; + } else if (Double.POSITIVE_INFINITY != affinity) { + affinity += f; + } } - public void addAffinity(float f){ - affinity += f; + /** + * Is this endpoint required to be in fragment endpoint assignment list? + * + * @return Returns true for mandatory assignment, false otherwise. + */ + public boolean isAssignmentRequired() { + return Double.POSITIVE_INFINITY == affinity; } @Override public String toString() { return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]"; } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java new file mode 100644 index 0000000..c33836c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +/** + * MinorFragmentEndpoint represents fragment's MinorFragmentId and Drillbit endpoint to which the fragment is + * assigned for execution. + */ +@JsonTypeName("fragment-endpoint") +public class MinorFragmentEndpoint { + private final int id; + private final DrillbitEndpoint endpoint; + + @JsonCreator + public MinorFragmentEndpoint(@JsonProperty("minorFragmentId") int id, @JsonProperty("endpoint") DrillbitEndpoint endpoint) { + this.id = id; + this.endpoint = endpoint; + } + + /** + * Get the minor fragment id. + * @return Minor fragment id. + */ + @JsonProperty("minorFragmentId") + public int getId() { + return id; + } + + /** + * Get the Drillbit endpoint where the fragment is assigned for execution. + * + * @return Drillbit endpoint. + */ + @JsonProperty("endpoint") + public DrillbitEndpoint getEndpoint() { + return endpoint; + } + + @Override + public String toString() { + return "FragmentEndPoint: id = " + id + ", ep = " + endpoint; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java index 73280ea..5fbe838 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java @@ -17,16 +17,26 @@ */ package org.apache.drill.exec.physical.base; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; public abstract class AbstractExchange extends AbstractSingle implements Exchange { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class); + // Ephemeral info for generating execution fragments. protected int senderMajorFragmentId; protected int receiverMajorFragmentId; + protected List<DrillbitEndpoint> senderLocations; + protected List<DrillbitEndpoint> receiverLocations; public AbstractExchange(PhysicalOperator child) { super(child); @@ -41,13 +51,58 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang return false; } + /** + * Default sender parallelization width range is [1, Integer.MAX_VALUE] and no endpoint affinity + * @param receiverFragmentEndpoints Endpoints assigned to receiver fragment if available, otherwise an empty list. + * @return + */ + @Override + public ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints) { + return ParallelizationInfo.UNLIMITED_WIDTH_NO_ENDPOINT_AFFINITY; + } + + /** + * Default receiver parallelization width range is [1, Integer.MAX_VALUE] and affinity to nodes where sender + * fragments are running. + * @param senderFragmentEndpoints Endpoints assigned to receiver fragment if available, otherwise an empty list. + * @return + */ @Override - public int getMaxReceiveWidth() { - return Integer.MAX_VALUE; + public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) { + Preconditions.checkArgument(senderFragmentEndpoints != null && senderFragmentEndpoints.size() > 0, + "Sender fragment endpoint list should not be empty"); + + return ParallelizationInfo.create(1, Integer.MAX_VALUE, getDefaultAffinityMap(senderFragmentEndpoints)); } - protected abstract void setupSenders(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException ; - protected abstract void setupReceivers(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException ; + /** + * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrances + * in given endpoint list. + * + * @param fragmentEndpoints Drillbit endpoint assignments of fragments. + * @return List of EndpointAffinity objects for each Drillbit endpoint given <i>fragmentEndpoints</i>. + */ + protected static List<EndpointAffinity> getDefaultAffinityMap(List<DrillbitEndpoint> fragmentEndpoints) { + Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap(); + final double affinityPerOccurrence = 1.0d / fragmentEndpoints.size(); + for(DrillbitEndpoint sender : fragmentEndpoints) { + if (affinityMap.containsKey(sender)) { + affinityMap.get(sender).addAffinity(affinityPerOccurrence); + } else { + affinityMap.put(sender, new EndpointAffinity(sender, affinityPerOccurrence)); + } + } + + return new ArrayList(affinityMap.values()); + } + + protected void setupSenders(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException { + this.senderLocations = ImmutableList.copyOf(senderLocations); + } + + protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { + this.receiverLocations = ImmutableList.copyOf(receiverLocations); + } @Override public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException { @@ -72,5 +127,8 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang throw new UnsupportedOperationException(); } - + @Override + public ParallelizationDependency getParallelizationDependency() { + return ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 5d0d9bf..276ecb5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.base; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -24,17 +25,22 @@ import org.apache.drill.common.expression.SchemaPath; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Iterators; +import org.apache.drill.exec.physical.EndpointAffinity; public abstract class AbstractGroupScan extends AbstractBase implements GroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class); - @Override public Iterator<PhysicalOperator> iterator() { return Iterators.emptyIterator(); } @Override + public List<EndpointAffinity> getOperatorAffinity() { + return Collections.emptyList(); + } + + @Override public boolean isExecutable() { return false; } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java index f621a26..f01d025 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java @@ -20,18 +20,28 @@ package org.apache.drill.exec.physical.base; import java.util.Iterator; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; public abstract class AbstractReceiver extends AbstractBase implements Receiver{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractReceiver.class); private final int oppositeMajorFragmentId; + private final List<MinorFragmentEndpoint> senders; - public AbstractReceiver(int oppositeMajorFragmentId){ + /** + * @param oppositeMajorFragmentId MajorFragmentId of fragments that are sending data to this receiver. + * @param senders List of sender MinorFragmentEndpoints each containing sender MinorFragmentId and Drillbit endpoint + * where it is running. + */ + public AbstractReceiver(int oppositeMajorFragmentId, List<MinorFragmentEndpoint> senders){ this.oppositeMajorFragmentId = oppositeMajorFragmentId; + this.senders = ImmutableList.copyOf(senders); } @Override @@ -56,5 +66,14 @@ public abstract class AbstractReceiver extends AbstractBase implements Receiver{ return oppositeMajorFragmentId; } + @JsonProperty("senders") + public List<MinorFragmentEndpoint> getProvidingEndpoints() { + return senders; + } + + @JsonIgnore + public int getNumSenders() { + return senders.size(); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java index 53a0721..2e42f26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java @@ -18,16 +18,29 @@ package org.apache.drill.exec.physical.base; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import java.util.List; public abstract class AbstractSender extends AbstractSingle implements Sender { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSender.class); protected final int oppositeMajorFragmentId; - - public AbstractSender(int oppositeMajorFragmentId, PhysicalOperator child) { + // + protected final List<MinorFragmentEndpoint> destinations; + + /** + * @param oppositeMajorFragmentId MajorFragmentId of fragments that are receiving data sent by this sender. + * @param child Child PhysicalOperator which is providing data to this Sender. + * @param destinations List of receiver MinorFragmentEndpoints each containing MinorFragmentId and Drillbit endpoint + * where it is running. + */ + public AbstractSender(int oppositeMajorFragmentId, PhysicalOperator child, List<MinorFragmentEndpoint> destinations) { super(child); this.oppositeMajorFragmentId = oppositeMajorFragmentId; + this.destinations = ImmutableList.copyOf(destinations); } @Override @@ -36,9 +49,14 @@ public abstract class AbstractSender extends AbstractSingle implements Sender { } @Override + @JsonProperty("receiver-major-fragment") public int getOppositeMajorFragmentId() { return oppositeMajorFragmentId; } - + @Override + @JsonProperty("destinations") + public List<MinorFragmentEndpoint> getDestinations() { + return destinations; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java index 7be7f20..d8014f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base; import java.util.List; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -27,6 +28,19 @@ import com.fasterxml.jackson.annotation.JsonIgnore; public interface Exchange extends PhysicalOperator { /** + * Exchanges are fragment boundaries in physical operator tree. It is divided into two parts. First part is Sender + * which becomes part of the sending fragment. Second part is Receiver which becomes part of the fragment that + * receives the data. + * + * Assignment dependency describes whether sender fragments depend on receiver fragment's endpoint assignment for + * determining its parallelization and endpoint assignment and vice versa. + */ + public enum ParallelizationDependency { + SENDER_DEPENDS_ON_RECEIVER, // Sending fragment depends on receiving fragment for parallelization + RECEIVER_DEPENDS_ON_SENDER, // Receiving fragment depends on sending fragment for parallelization (default value). + } + + /** * Inform this Exchange node about its sender locations. This list should be index-ordered the same as the expected * minorFragmentIds for each sender. * @@ -65,20 +79,24 @@ public interface Exchange extends PhysicalOperator { public abstract Receiver getReceiver(int minorFragmentId); /** - * The widest width this sender can send (max sending parallelization). Typically Integer.MAX_VALUE. + * Provide parallelization parameters for sender side of the exchange. Output includes min width, + * max width and affinity to Drillbits. * + * @param receiverFragmentEndpoints Endpoints assigned to receiver fragment if available, otherwise an empty list. * @return */ @JsonIgnore - public abstract int getMaxSendWidth(); + public abstract ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints); /** - * The widest width this receiver can receive(max receive parallelization). Default is Integer.MAX_VALUE. + * Provide parallelization parameters for receiver side of the exchange. Output includes min width, + * max width and affinity to Drillbits. * + * @param senderFragmentEndpoints Endpoints assigned to receiver fragment if available, otherwise an empty list * @return */ @JsonIgnore - public abstract int getMaxReceiveWidth(); + public abstract ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints); /** * Return the feeding child of this operator node. @@ -87,4 +105,9 @@ public interface Exchange extends PhysicalOperator { */ public PhysicalOperator getChild(); + /** + * Get the parallelization dependency of the Exchange. + */ + @JsonIgnore + public ParallelizationDependency getParallelizationDependency(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java index dfcb113..487ce77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java @@ -17,9 +17,14 @@ */ package org.apache.drill.exec.physical.base; +import com.google.common.collect.Lists; import org.apache.drill.common.config.CommonConstants; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.PathScanner; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.List; public class PhysicalOperatorUtil { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class); @@ -31,4 +36,22 @@ public class PhysicalOperatorUtil { logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) ); return ops; } + + /** + * Helper method to create a list of MinorFragmentEndpoint instances from a given endpoint assignment list. + * + * @param endpoints Assigned endpoint list. Index of each endpoint in list indicates the MinorFragmentId of the + * fragment that is assigned to the endpoint. + * @return + */ + public static List<MinorFragmentEndpoint> getIndexOrderedEndpoints(List<DrillbitEndpoint> endpoints) { + List<MinorFragmentEndpoint> destinations = Lists.newArrayList(); + int minorFragmentId = 0; + for(DrillbitEndpoint endpoint : endpoints) { + destinations.add(new MinorFragmentEndpoint(minorFragmentId, endpoint)); + minorFragmentId++; + } + + return destinations; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java index 0c67770..04d6d7e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base; import java.util.List; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -31,10 +31,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; public interface Receiver extends FragmentLeaf { /** - * A receiver is expecting streams from one or more providing endpoints. This method should return a list of the expected sending endpoints. - * @return List of counterpart sending DrillbitEndpoints. + * A receiver is expecting streams from one or more providing endpoints. + * @return List of sender MinorFragmentEndpoints each containing sender fragment MinorFragmentId and endpoint where + * it is running. */ - public abstract List<DrillbitEndpoint> getProvidingEndpoints(); + public abstract List<MinorFragmentEndpoint> getProvidingEndpoints(); /** * Whether or not this receive supports out of order exchange. This provides a hint for the scheduling node on whether http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java index bbd1b2c..e4f2adf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base; import java.util.List; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import com.fasterxml.jackson.annotation.JsonProperty; @@ -31,9 +31,10 @@ public interface Sender extends FragmentRoot { /** * Get the list of destination endpoints that this Sender will be communicating with. - * @return List of DrillbitEndpoints. + * @return List of receiver MinorFragmentEndpoints each containing receiver fragment MinorFragmentId and endpoint + * where it is running. */ - public abstract List<DrillbitEndpoint> getDestinations(); + public abstract List<MinorFragmentEndpoint> getDestinations(); /** * Get the receiver major fragment id that is opposite this sender. http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java new file mode 100644 index 0000000..352e84b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.List; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.AbstractExchange; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Sender; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * DeMuxExchange is opposite of MuxExchange. It is used when the sender has overhead that is proportional to the + * number of receivers. DeMuxExchange is run one instance per Drillbit endpoint which collects and distributes data + * belonging to local receiving fragments running on the same Drillbit. + * + * Example: + * On a 3 node cluster, if the sender has 10 receivers on each node each sender requires 30 buffers. By inserting + * DeMuxExchange, we create one receiver per node which means total of 3 receivers for each sender. If the number of + * senders is 10, we use 10*3 buffers instead of 10*30. DeMuxExchange has a overhead of buffer space that is equal to + * number of local receivers. In this case each DeMuxExchange needs 10 buffers, so total of 3*10 buffers. + */ +public abstract class AbstractDeMuxExchange extends AbstractExchange { + protected final LogicalExpression expr; + + // Ephemeral info used when creating execution fragments. + protected Map<Integer, MinorFragmentEndpoint> receiverToSenderMapping; + protected ArrayListMultimap<Integer, MinorFragmentEndpoint> senderToReceiversMapping; + private boolean isSenderReceiverMappingCreated; + + public AbstractDeMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) { + super(child); + this.expr = expr; + } + + @JsonProperty("expr") + public LogicalExpression getExpression(){ + return expr; + } + + @Override + public ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints) { + Preconditions.checkArgument(receiverFragmentEndpoints != null && receiverFragmentEndpoints.size() > 0, + "Receiver fragment endpoint list should not be empty"); + + // We want to run one demux sender per Drillbit endpoint. + // Identify the number of unique Drillbit endpoints in receiver fragment endpoints. + List<DrillbitEndpoint> drillbitEndpoints = ImmutableSet.copyOf(receiverFragmentEndpoints).asList(); + + List<EndpointAffinity> affinities = Lists.newArrayList(); + for(DrillbitEndpoint ep : drillbitEndpoints) { + affinities.add(new EndpointAffinity(ep, Double.POSITIVE_INFINITY)); + } + + return ParallelizationInfo.create(affinities.size(), affinities.size(), affinities); + } + + @Override + public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) { + return ParallelizationInfo.UNLIMITED_WIDTH_NO_ENDPOINT_AFFINITY; + } + + @Override + public Sender getSender(int minorFragmentId, PhysicalOperator child) { + createSenderReceiverMapping(); + + List<MinorFragmentEndpoint> receivers = senderToReceiversMapping.get(minorFragmentId); + if (receivers == null || receivers.size() <= 0) { + throw new IllegalStateException(String.format("Failed to find receivers for sender [%d]", minorFragmentId)); + } + + return new HashPartitionSender(receiverMajorFragmentId, child, expr, receivers); + } + + /** + * In DeMuxExchange, sender fragment parallelization and endpoint assignment depends on receiver fragment endpoint + * assignments. + */ + @Override + public ParallelizationDependency getParallelizationDependency() { + return ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER; + } + + protected void createSenderReceiverMapping() { + if (isSenderReceiverMappingCreated) { + return; + } + + senderToReceiversMapping = ArrayListMultimap.create(); + receiverToSenderMapping = Maps.newHashMap(); + + // Find the list of receiver fragment ids assigned to each Drillbit endpoint + ArrayListMultimap<DrillbitEndpoint, Integer> endpointReceiverList = ArrayListMultimap.create(); + + int receiverFragmentId = 0; + for(DrillbitEndpoint receiverLocation : receiverLocations) { + endpointReceiverList.put(receiverLocation, receiverFragmentId); + receiverFragmentId++; + } + + int senderFragmentId = 0; + for(DrillbitEndpoint senderLocation : senderLocations) { + final List<Integer> receiverMinorFragmentIds = endpointReceiverList.get(senderLocation); + + for(Integer receiverId : receiverMinorFragmentIds) { + receiverToSenderMapping.put(receiverId, new MinorFragmentEndpoint(senderFragmentId, senderLocation)); + + senderToReceiversMapping.put(senderFragmentId, + new MinorFragmentEndpoint(receiverId, receiverLocations.get(receiverId))); + } + senderFragmentId++; + } + + isSenderReceiverMappingCreated = true; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java new file mode 100644 index 0000000..6715f96 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.List; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.AbstractExchange; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Sender; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Multiplexing Exchange (MuxExchange) is used when results from multiple minor fragments belonging to the same + * major fragment running on a node need to be collected at one fragment on the same node before distributing the + * results further. This helps when the sender that is distributing the results has overhead that is proportional to + * the number of sender instances. An example of such sender is PartitionSender. Each instance of PartitionSender + * allocates "r" buffers where "r" is the number of receivers. + * + * Ex. Drillbit A is assigned 10 minor fragments belonging to the same major fragment. Each of these fragments + * has a PartitionSender instance which is sending data to 300 receivers. Each PartitionSender needs 300 buffers, + * so total of 10*300 buffers are needed. With MuxExchange, all 10 fragments send the data directly (without + * partitioning) to MuxExchange which uses the PartitionSender to partition the incoming data and distribute + * to receivers. MuxExchange has only one instance per Drillbit per major fragment which means only one instance of + * PartitionSender per Drillbit per major fragment. With MuxExchange total number of buffers used by PartitionSender + * for the 10 fragments is 300 instead of earlier number 10*300. + */ +public abstract class AbstractMuxExchange extends AbstractExchange { + + // Ephemeral info used when creating execution fragments. + protected Map<Integer, MinorFragmentEndpoint> senderToReceiverMapping; + protected ArrayListMultimap<Integer, MinorFragmentEndpoint> receiverToSenderMapping; + private boolean isSenderReceiverMappingCreated; + + public AbstractMuxExchange(@JsonProperty("child") PhysicalOperator child) { + super(child); + } + + @Override + public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) { + Preconditions.checkArgument(senderFragmentEndpoints != null && senderFragmentEndpoints.size() > 0, + "Sender fragment endpoint list should not be empty"); + + // We want to run one mux receiver per Drillbit endpoint. + // Identify the number of unique Drillbit endpoints in sender fragment endpoints. + List<DrillbitEndpoint> drillbitEndpoints = ImmutableSet.copyOf(senderFragmentEndpoints).asList(); + + List<EndpointAffinity> affinities = Lists.newArrayList(); + for(DrillbitEndpoint ep : drillbitEndpoints) { + affinities.add(new EndpointAffinity(ep, Double.POSITIVE_INFINITY)); + } + + return ParallelizationInfo.create(affinities.size(), affinities.size(), affinities); + } + + @Override + public Sender getSender(int minorFragmentId, PhysicalOperator child) { + createSenderReceiverMapping(); + + MinorFragmentEndpoint receiver = senderToReceiverMapping.get(minorFragmentId); + if (receiver == null) { + throw new IllegalStateException(String.format("Failed to find receiver for sender [%d]", minorFragmentId)); + } + + return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, receiver.getEndpoint()); + } + + protected void createSenderReceiverMapping() { + if (isSenderReceiverMappingCreated) { + return; + } + + senderToReceiverMapping = Maps.newHashMap(); + receiverToSenderMapping = ArrayListMultimap.create(); + + // Find the list of sender fragment ids assigned to each Drillbit endpoint. + ArrayListMultimap<DrillbitEndpoint, Integer> endpointSenderList = ArrayListMultimap.create(); + + int senderFragmentId = 0; + for(DrillbitEndpoint senderLocation : senderLocations) { + endpointSenderList.put(senderLocation, senderFragmentId); + senderFragmentId++; + } + + int receiverFragmentId = 0; + for(DrillbitEndpoint receiverLocation : receiverLocations) { + List<Integer> senderFragmentIds = endpointSenderList.get(receiverLocation); + + for(Integer senderId : senderFragmentIds) { + senderToReceiverMapping.put(senderId, new MinorFragmentEndpoint(receiverFragmentId, receiverLocation)); + + receiverToSenderMapping.put(receiverFragmentId, + new MinorFragmentEndpoint(senderId, senderLocations.get(senderId))); + } + receiverFragmentId++; + } + + isSenderReceiverMappingCreated = true; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java index 73a1d20..a37f638 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java @@ -17,15 +17,12 @@ ******************************************************************************/ package org.apache.drill.exec.physical.config; -import java.util.List; - import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; -import org.apache.drill.exec.proto.CoordinationProtos; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -34,41 +31,24 @@ import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("broadcast-exchange") public class BroadcastExchange extends AbstractExchange { - private List<DrillbitEndpoint> senderLocations; - private List<DrillbitEndpoint> receiverLocations; - @JsonCreator public BroadcastExchange(@JsonProperty("child") PhysicalOperator child) { super(child); } @Override - protected void setupSenders(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException { - this.senderLocations = senderLocations; - } - - @Override - protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { - this.receiverLocations = receiverLocations; - } - - @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { return new BroadcastExchange(child); } @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException { - return new BroadcastSender(receiverMajorFragmentId, child, receiverLocations); + return new BroadcastSender(receiverMajorFragmentId, child, + PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations)); } @Override public Receiver getReceiver(int minorFragmentId) { - return new UnorderedReceiver(senderMajorFragmentId, senderLocations); - } - - @Override - public int getMaxSendWidth() { - return Integer.MAX_VALUE; + return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations)); } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java index 1827367..7ddfa95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java @@ -20,10 +20,10 @@ package org.apache.drill.exec.physical.config; import java.util.List; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractSender; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; @@ -33,14 +33,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("broadcast-sender") public class BroadcastSender extends AbstractSender { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSender.class); - private final List<DrillbitEndpoint> destinations; @JsonCreator public BroadcastSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, - @JsonProperty("destinations") List<DrillbitEndpoint> destinations) { - super(oppositeMajorFragmentId, child); - this.destinations = destinations; + @JsonProperty("destinations") List<MinorFragmentEndpoint> destinations) { + super(oppositeMajorFragmentId, child, destinations); } @Override @@ -49,11 +47,6 @@ public class BroadcastSender extends AbstractSender { } @Override - public List<DrillbitEndpoint> getDestinations() { - return destinations; - } - - @Override public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { return physicalVisitor.visitBroadcastSender(this, value); } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java index bdb1362..52db80d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java @@ -20,10 +20,10 @@ package org.apache.drill.exec.physical.config; import java.util.List; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractSender; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; @@ -34,27 +34,20 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class HashPartitionSender extends AbstractSender { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class); - private final List<DrillbitEndpoint> endpoints; private final LogicalExpression expr; @JsonCreator public HashPartitionSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, - @JsonProperty("destinations") List<DrillbitEndpoint> endpoints) { - super(oppositeMajorFragmentId, child); + @JsonProperty("destinations") List<MinorFragmentEndpoint> endpoints) { + super(oppositeMajorFragmentId, child, endpoints); this.expr = expr; - this.endpoints = endpoints; - } - - @Override - public List<DrillbitEndpoint> getDestinations() { - return endpoints; } @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new HashPartitionSender(oppositeMajorFragmentId, child, expr, endpoints); + return new HashPartitionSender(oppositeMajorFragmentId, child, expr, destinations); } public LogicalExpression getExpr() { http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java index f62d922..f45ace9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java @@ -23,9 +23,9 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -35,14 +35,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class HashToMergeExchange extends AbstractExchange{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class); - private final LogicalExpression distExpr; private final List<Ordering> orderExprs; - //ephemeral for setup tasks. - private List<DrillbitEndpoint> senderLocations; - private List<DrillbitEndpoint> receiverLocations; - @JsonCreator public HashToMergeExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, @@ -53,29 +48,14 @@ public class HashToMergeExchange extends AbstractExchange{ } @Override - public int getMaxSendWidth() { - return Integer.MAX_VALUE; - } - - - @Override - protected void setupSenders(List<DrillbitEndpoint> senderLocations) { - this.senderLocations = senderLocations; - } - - @Override - protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) { - this.receiverLocations = receiverLocations; - } - - @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) { - return new HashPartitionSender(receiverMajorFragmentId, child, distExpr, receiverLocations); + return new HashPartitionSender(receiverMajorFragmentId, child, distExpr, + PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations)); } @Override public Receiver getReceiver(int minorFragmentId) { - return new MergingReceiverPOP(senderMajorFragmentId, senderLocations, orderExprs); + return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExprs); } @Override @@ -87,7 +67,4 @@ public class HashToMergeExchange extends AbstractExchange{ public List<Ordering> getOrderExpressions(){ return orderExprs; } - - - } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java index fac374b..52d79c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java @@ -22,9 +22,9 @@ import java.util.List; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -34,13 +34,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class HashToRandomExchange extends AbstractExchange{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToRandomExchange.class); - private final LogicalExpression expr; - //ephemeral for setup tasks. - private List<DrillbitEndpoint> senderLocations; - private List<DrillbitEndpoint> receiverLocations; - @JsonCreator public HashToRandomExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) { super(child); @@ -48,29 +43,14 @@ public class HashToRandomExchange extends AbstractExchange{ } @Override - public int getMaxSendWidth() { - return Integer.MAX_VALUE; - } - - - @Override - protected void setupSenders(List<DrillbitEndpoint> senderLocations) { - this.senderLocations = senderLocations; - } - - @Override - protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) { - this.receiverLocations = receiverLocations; - } - - @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) { - return new HashPartitionSender(receiverMajorFragmentId, child, expr, receiverLocations); + return new HashPartitionSender(receiverMajorFragmentId, child, expr, + PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations)); } @Override public Receiver getReceiver(int minorFragmentId) { - return new UnorderedReceiver(senderMajorFragmentId, senderLocations); + return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations)); } @Override @@ -82,7 +62,4 @@ public class HashToRandomExchange extends AbstractExchange{ public LogicalExpression getExpression(){ return expr; } - - - } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java index f5dca1a..9416814 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java @@ -20,13 +20,12 @@ package org.apache.drill.exec.physical.config; import java.util.List; import org.apache.drill.common.logical.data.Order.Ordering; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -38,25 +37,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class MergingReceiverPOP extends AbstractReceiver{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverPOP.class); - private final List<DrillbitEndpoint> senders; private final List<Ordering> orderings; @JsonCreator public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, - @JsonProperty("senders") List<DrillbitEndpoint> senders, + @JsonProperty("senders") List<MinorFragmentEndpoint> senders, @JsonProperty("orderings") List<Ordering> orderings) { - super(oppositeMajorFragmentId); - this.senders = senders; + super(oppositeMajorFragmentId, senders); this.orderings = orderings; } @Override - @JsonProperty("senders") - public List<DrillbitEndpoint> getProvidingEndpoints() { - return senders; - } - - @Override public boolean supportsOutOfOrderExchange() { return false; } @@ -74,9 +65,4 @@ public class MergingReceiverPOP extends AbstractReceiver{ public int getOperatorType() { return CoreOperatorType.MERGING_RECEIVER_VALUE; } - - @JsonIgnore - public int getNumSenders() { - return senders.size(); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java index 8e1526a..c8dbc22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java @@ -23,9 +23,9 @@ import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -36,17 +36,12 @@ import com.google.common.base.Preconditions; public class OrderedPartitionExchange extends AbstractExchange { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionExchange.class); - private final List<Ordering> orderings; private final FieldReference ref; private int recordsToSample = 10000; // How many records must be received before analyzing private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache private float completionFactor = .75f; // What fraction of fragments must be completed before attempting to build partition table - //ephemeral for setup tasks. - private List<DrillbitEndpoint> senderLocations; - private List<DrillbitEndpoint> receiverLocations; - @JsonCreator public OrderedPartitionExchange(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample, @@ -70,30 +65,15 @@ public class OrderedPartitionExchange extends AbstractExchange { } @Override - public int getMaxSendWidth() { - return Integer.MAX_VALUE; - } - - - @Override - protected void setupSenders(List<DrillbitEndpoint> senderLocations) { - this.senderLocations = senderLocations; - } - - @Override - protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) { - this.receiverLocations = receiverLocations; - } - - @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) { - return new OrderedPartitionSender(orderings, ref, child, receiverLocations, receiverMajorFragmentId, senderLocations.size(), recordsToSample, - samplingFactor, completionFactor); + return new OrderedPartitionSender(orderings, ref, child, + PhysicalOperatorUtil.getIndexOrderedEndpoints(receiverLocations), + receiverMajorFragmentId, senderLocations.size(), recordsToSample, samplingFactor, completionFactor); } @Override public Receiver getReceiver(int minorFragmentId) { - return new UnorderedReceiver(senderMajorFragmentId, senderLocations); + return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations)); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java index 0a2b9be..2c9aeaf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java @@ -21,10 +21,10 @@ import java.util.List; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.Order.Ordering; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractSender; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; @@ -38,7 +38,6 @@ public class OrderedPartitionSender extends AbstractSender { private final List<Ordering> orderings; private final FieldReference ref; - private final List<DrillbitEndpoint> endpoints; private final int sendingWidth; private int recordsToSample; @@ -46,18 +45,22 @@ public class OrderedPartitionSender extends AbstractSender { private float completionFactor; @JsonCreator - public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child, - @JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, - @JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample, - @JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) { - super(oppositeMajorFragmentId, child); + public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> orderings, + @JsonProperty("ref") FieldReference ref, + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("destinations") List<MinorFragmentEndpoint> endpoints, + @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, + @JsonProperty("sending-fragment-width") int sendingWidth, + @JsonProperty("recordsToSample") int recordsToSample, + @JsonProperty("samplingFactor") int samplingFactor, + @JsonProperty("completionFactor") float completionFactor) { + super(oppositeMajorFragmentId, child, endpoints); if (orderings == null) { this.orderings = Lists.newArrayList(); } else { this.orderings = orderings; } this.ref = ref; - this.endpoints = endpoints; this.sendingWidth = sendingWidth; this.recordsToSample = recordsToSample; this.samplingFactor = samplingFactor; @@ -68,10 +71,6 @@ public class OrderedPartitionSender extends AbstractSender { return sendingWidth; } - public List<DrillbitEndpoint> getDestinations() { - return endpoints; - } - public List<Ordering> getOrderings() { return orderings; } @@ -87,8 +86,8 @@ public class OrderedPartitionSender extends AbstractSender { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new OrderedPartitionSender(orderings, ref, child, endpoints, oppositeMajorFragmentId, sendingWidth, recordsToSample, samplingFactor, - completionFactor); + return new OrderedPartitionSender(orderings, ref, child, destinations, oppositeMajorFragmentId, + sendingWidth, recordsToSample, samplingFactor, completionFactor); } public int getRecordsToSample() { http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java index c8c8f43..e53fc0e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java @@ -17,8 +17,11 @@ */ package org.apache.drill.exec.physical.config; +import java.util.Collections; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractSender; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -37,22 +40,15 @@ public class RangeSender extends AbstractSender{ @JsonCreator public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) { - super(oppositeMajorFragmentId, child); + super(oppositeMajorFragmentId, child, Collections.<MinorFragmentEndpoint>emptyList()); this.partitions = partitions; } @Override - public List<DrillbitEndpoint> getDestinations() { - return null; - } - - - @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { return new RangeSender(oppositeMajorFragmentId, child, partitions); } - public static class EndpointPartition{ private final PartitionRange range; private final DrillbitEndpoint endpoint; http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java index 58c8e29..97c2405 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java @@ -48,7 +48,7 @@ public class Screen extends AbstractStore { @Override public List<EndpointAffinity> getOperatorAffinity() { - return Collections.singletonList(new EndpointAffinity(endpoint, 1000000000000l)); + return Collections.singletonList(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY)); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java index 2914112..c812325 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java @@ -20,13 +20,16 @@ package org.apache.drill.exec.physical.config; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; -import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -38,10 +41,6 @@ public class SingleMergeExchange extends AbstractExchange { private final List<Ordering> orderExpr; - // ephemeral for setup tasks - private List<CoordinationProtos.DrillbitEndpoint> senderLocations; - private CoordinationProtos.DrillbitEndpoint receiverLocation; - @JsonCreator public SingleMergeExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderExpr) { @@ -50,39 +49,30 @@ public class SingleMergeExchange extends AbstractExchange { } @Override - public int getMaxSendWidth() { - return Integer.MAX_VALUE; - } + public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) { + Preconditions.checkArgument(senderFragmentEndpoints != null && senderFragmentEndpoints.size() > 0, + "Sender fragment endpoint list should not be empty"); - @Override - public int getMaxReceiveWidth() { - return 1; + return ParallelizationInfo.create(1, 1, getDefaultAffinityMap(senderFragmentEndpoints)); } @Override - protected void setupSenders(List<CoordinationProtos.DrillbitEndpoint> senderLocations) { - this.senderLocations = senderLocations; - } - - @Override - protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations) + protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { + Preconditions.checkArgument(receiverLocations.size() == 1, + "SingleMergeExchange only supports a single receiver endpoint."); - if (receiverLocations.size() != 1) { - throw new PhysicalOperatorSetupException("SingleMergeExchange only supports a single receiver endpoint"); - } - receiverLocation = receiverLocations.iterator().next(); - + super.setupReceivers(receiverLocations); } @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) { - return new SingleSender(receiverMajorFragmentId, child, receiverLocation); + return new SingleSender(receiverMajorFragmentId, child, receiverLocations.iterator().next()); } @Override public Receiver getReceiver(int minorFragmentId) { - return new MergingReceiverPOP(senderMajorFragmentId, senderLocations, orderExpr); + return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExpr); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java index 4a11a51..9745ae7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java @@ -19,7 +19,10 @@ package org.apache.drill.exec.physical.config; import java.util.Collections; import java.util.List; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractSender; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -38,23 +41,45 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class SingleSender extends AbstractSender { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSender.class); - private final DrillbitEndpoint destination; - + /** + * Create a SingleSender which sends data to fragment identified by given MajorFragmentId and MinorFragmentId, + * and running at given endpoint + * + * @param oppositeMajorFragmentId MajorFragmentId of the receiver fragment. + * @param oppositeMinorFragmentId MinorFragmentId of the receiver fragment. + * @param child Child operator + * @param destination Drillbit endpoint where the receiver fragment is running. + */ @JsonCreator - public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destination") DrillbitEndpoint destination) { - super(oppositeMajorFragmentId, child); - this.destination = destination; + public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, + @JsonProperty("receiver-minor-fragment") int oppositeMinorFragmentId, + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("destination") DrillbitEndpoint destination) { + super(oppositeMajorFragmentId, child, + Collections.singletonList(new MinorFragmentEndpoint(oppositeMinorFragmentId, destination))); + } + + /** + * Create a SingleSender which sends data to fragment with MinorFragmentId as <i>0</i> in given opposite major + * fragment. + * + * @param oppositeMajorFragmentId MajorFragmentId of the receiver fragment. + * @param child Child operator + * @param destination Drillbit endpoint where the receiver fragment is running. + */ + public SingleSender(int oppositeMajorFragmentId, PhysicalOperator child, DrillbitEndpoint destination) { + this(oppositeMajorFragmentId, 0 /* default opposite minor fragment id*/, child, destination); } @Override - @JsonIgnore - public List<DrillbitEndpoint> getDestinations() { - return Collections.singletonList(destination); + @JsonIgnore // Destination endpoint is exported via getDestination() and getOppositeMinorFragmentId() + public List<MinorFragmentEndpoint> getDestinations() { + return destinations; } @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new SingleSender(oppositeMajorFragmentId, child, destination); + return new SingleSender(oppositeMajorFragmentId, getOppositeMinorFragmentId(), child, getDestination()); } @Override @@ -62,9 +87,14 @@ public class SingleSender extends AbstractSender { return physicalVisitor.visitSingleSender(this, value); } - + @JsonProperty("destination") public DrillbitEndpoint getDestination() { - return destination; + return getDestinations().get(0).getEndpoint(); + } + + @JsonProperty("receiver-minor-fragment") + public int getOppositeMinorFragmentId() { + return getDestinations().get(0).getId(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java index cfc21ac..b7b7835 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java @@ -19,11 +19,14 @@ package org.apache.drill.exec.physical.config; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonProperty; @@ -34,44 +37,39 @@ public class UnionExchange extends AbstractExchange{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionExchange.class); - private List<DrillbitEndpoint> senderLocations; - private DrillbitEndpoint destinationLocation; - public UnionExchange(@JsonProperty("child") PhysicalOperator child) { super(child); } @Override + public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) { + Preconditions.checkArgument(senderFragmentEndpoints != null && senderFragmentEndpoints.size() > 0, + "Sender fragment endpoint list should not be empty"); + + return ParallelizationInfo.create(1, 1, getDefaultAffinityMap(senderFragmentEndpoints)); + } + + @Override public void setupSenders(List<DrillbitEndpoint> senderLocations) { this.senderLocations = senderLocations; } @Override protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { - if (receiverLocations.size() != 1) { - throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint."); - } - this.destinationLocation = receiverLocations.iterator().next(); + Preconditions.checkArgument(receiverLocations.size() == 1, + "Union Exchange only supports a single receiver endpoint."); + + super.setupReceivers(receiverLocations); } @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) { - return new SingleSender(this.receiverMajorFragmentId, child, destinationLocation); + return new SingleSender(receiverMajorFragmentId, child, receiverLocations.get(0)); } @Override public Receiver getReceiver(int minorFragmentId) { - return new UnorderedReceiver(this.senderMajorFragmentId, senderLocations); - } - - @Override - public int getMaxSendWidth() { - return Integer.MAX_VALUE; - } - - @Override - public int getMaxReceiveWidth() { - return 1; + return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations)); } @Override