DRILL-6115: SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
DRILL-6115: Refactoring the existing code. close apache/drill#1110 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f203ea9 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f203ea9 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f203ea9 Branch: refs/heads/master Commit: 4f203ea99e2d6bd5596672a0ea67445720630228 Parents: 8cb3234 Author: Hanumath Rao Maduri <hanu....@gmail.com> Authored: Mon Feb 12 18:19:07 2018 -0800 Committer: Aman Sinha <asi...@maprtech.com> Committed: Fri Feb 23 14:13:40 2018 -0800 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 + .../physical/base/AbstractPhysicalVisitor.java | 18 -- .../exec/physical/base/PhysicalVisitor.java | 7 - .../physical/config/AbstractMuxExchange.java | 15 ++ .../physical/config/OrderedMuxExchange.java | 52 +++++ .../physical/config/UnorderedMuxExchange.java | 9 +- .../exec/planner/physical/ExchangePrel.java | 13 ++ .../exec/planner/physical/HashPrelUtil.java | 23 ++- .../physical/HashToMergeExchangePrel.java | 4 - .../physical/HashToRandomExchangePrel.java | 77 ++++++++ .../physical/OrderedMuxExchangePrel.java | 74 +++++++ .../exec/planner/physical/PlannerSettings.java | 1 + .../physical/SingleMergeExchangePrel.java | 21 ++ .../visitor/InsertLocalExchangeVisitor.java | 115 +---------- .../planner/physical/visitor/PrelVisitor.java | 1 - .../server/options/SystemOptionManager.java | 1 + .../src/main/resources/drill-module.conf | 1 + .../physical/impl/TestOrderedMuxExchange.java | 193 +++++++++++++++++++ 18 files changed, 479 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index a1a94fa..fb2907d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -455,6 +455,9 @@ public final class ExecConstants { public static final String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width"; public static final OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE); + // Mux Exchange options. + public static final String ORDERED_MUX_EXCHANGE = "planner.enable_ordered_mux_exchange"; + // Resource management boot-time options. public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node"; http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index a3b5f27..9933ddc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -23,7 +23,6 @@ import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.HashToRandomExchange; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.MergeJoinPOP; @@ -32,14 +31,12 @@ import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RangeSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; -import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.config.WindowPOP; @@ -176,16 +173,6 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override - public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E { - return visitExchange(op, value); - } - - @Override - public T visitRangeSender(RangeSender op, X value) throws E { - return visitSender(op, value); - } - - @Override public T visitBroadcastSender(BroadcastSender op, X value) throws E { return visitSender(op, value); } @@ -201,11 +188,6 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override - public T visitUnionExchange(UnionExchange op, X value) throws E { - return visitExchange(op, value); - } - - @Override public T visitProducerConsumer(ProducerConsumer op, X value) throws E { return visitOp(op, value); } http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index ae6ff60..0eadb79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -23,7 +23,6 @@ import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.HashToRandomExchange; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.MergeJoinPOP; @@ -32,14 +31,12 @@ import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RangeSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; -import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.config.WindowPOP; @@ -58,7 +55,6 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitGroupScan(GroupScan groupScan, EXTRA value) throws EXCEP; public RETURN visitSubScan(SubScan subScan, EXTRA value) throws EXCEP; public RETURN visitStore(Store store, EXTRA value) throws EXCEP; - public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP; public RETURN visitUnion(UnionAll union, EXTRA value) throws EXCEP; public RETURN visitProject(Project project, EXTRA value) throws EXCEP; @@ -81,12 +77,9 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA value) throws EXCEP; public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) throws EXCEP; public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) throws EXCEP; - public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP; - public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP; public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws EXCEP; public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; - public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP; public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java index 6715f96..5a76dec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; import org.apache.drill.exec.planner.fragment.ParallelizationInfo; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -51,6 +52,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; * for the 10 fragments is 300 instead of earlier number 10*300. */ public abstract class AbstractMuxExchange extends AbstractExchange { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMuxExchange.class); // Ephemeral info used when creating execution fragments. protected Map<Integer, MinorFragmentEndpoint> senderToReceiverMapping; @@ -90,6 +92,19 @@ public abstract class AbstractMuxExchange extends AbstractExchange { return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, receiver.getEndpoint()); } + protected final List<MinorFragmentEndpoint> getSenders(int minorFragmentId) { + createSenderReceiverMapping(); + + List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId); + + logger.debug("Minor fragment {}, receives data from following senders {}", minorFragmentId, senders); + if (senders == null || senders.size() <= 0) { + throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId)); + } + + return senders; + } + protected void createSenderReceiverMapping() { if (isSenderReceiverMappingCreated) { return; http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java new file mode 100644 index 0000000..2b9653a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.List; + +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Receiver; +import org.apache.drill.common.logical.data.Order.Ordering; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted + * merge operation is performed to produced a sorted stream as output. + */ +@JsonTypeName("ordered-mux-exchange") +public class OrderedMuxExchange extends AbstractMuxExchange { + + private final List<Ordering> orderings; + + public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings")List<Ordering> orderings) { + super(child); + this.orderings = orderings; + } + + @Override + public Receiver getReceiver(int minorFragmentId) { + return new MergingReceiverPOP(senderMajorFragmentId, getSenders(minorFragmentId), orderings, false); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new OrderedMuxExchange(child, orderings); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java index 46f1fd7..fed8e54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java @@ -38,14 +38,7 @@ public class UnorderedMuxExchange extends AbstractMuxExchange { @Override public Receiver getReceiver(int minorFragmentId) { - createSenderReceiverMapping(); - - List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId); - if (senders == null || senders.size() <= 0) { - throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId)); - } - - return new UnorderedReceiver(senderMajorFragmentId, senders, false); + return new UnorderedReceiver(senderMajorFragmentId, getSenders(minorFragmentId), false); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java index 9ad1c67..f3f87ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java @@ -22,6 +22,9 @@ import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; +import org.apache.drill.exec.server.options.OptionManager; + +import java.util.Collections; public abstract class ExchangePrel extends SinglePrel{ @@ -34,4 +37,14 @@ public abstract class ExchangePrel extends SinglePrel{ return logicalVisitor.visitExchange(this, value); } + /** + * The derived classes can override this method to create relevant mux exchanges. + * If this method is not overrided the default behaviour is to clone itself. + * @param child input to the new muxPrel or new Exchange node. + * @param options options manager to check if mux is enabled. + */ + public Prel constructMuxPrel(Prel child, OptionManager options) { + return (Prel)copy(getTraitSet(), Collections.singletonList(((RelNode)child))); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java index 25b7017..9c12cc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java @@ -19,19 +19,15 @@ package org.apache.drill.exec.planner.physical; import com.google.common.collect.ImmutableList; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.ValueExpressions; -import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.expr.DirectExpression; -import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; -import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.planner.sql.DrillSqlOperator; import java.util.ArrayList; import java.util.List; @@ -63,6 +59,21 @@ public class HashPrelUtil { } }; + public static class RexNodeBasedHashExpressionCreatorHelper implements HashExpressionCreatorHelper<RexNode> { + private final RexBuilder rexBuilder; + + public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) { + this.rexBuilder = rexBuilder; + } + + @Override + public RexNode createCall(String funcName, List<RexNode> inputFields) { + final DrillSqlOperator op = + new DrillSqlOperator(funcName, inputFields.size(), true, false); + return rexBuilder.makeCall(op, inputFields); + } + } + // The hash32 functions actually use hash64 underneath. The reason we want to call hash32 is that // the hash based operators make use of 4 bytes of hash value, not 8 bytes (for reduced memory use). private static final String HASH32_FUNCTION_NAME = "hash32"; http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java index 671eb61..2272a9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java @@ -90,10 +90,6 @@ public class HashToMergeExchangePrel extends ExchangePrel { } - public List<DistributionField> getDistFields() { - return this.distFields; - } - public RelCollation getCollation() { return this.collation; } http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java index b8a4001..2254c56 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java @@ -18,10 +18,17 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; +import java.math.BigDecimal; import java.util.List; +import com.google.common.collect.Lists; import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.HashToRandomExchange; import org.apache.drill.exec.planner.cost.DrillCostBase; @@ -35,6 +42,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.drill.exec.server.options.OptionManager; public class HashToRandomExchangePrel extends ExchangePrel { @@ -112,6 +120,75 @@ public class HashToRandomExchangePrel extends ExchangePrel { return pw; } + /** + * This method creates a new UnorderedMux and Demux exchanges if mux operators are enabled. + * @param child input to the new Unordered[Mux/Demux]Prel or new HashToRandomExchange node. + * @param options options manager to check if mux is enabled. + */ + @Override + public Prel constructMuxPrel(Prel child, OptionManager options) { + boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val; + Prel newPrel = child; + + final List<String> childFields = child.getRowType().getFieldNames(); + + List <RexNode> removeUpdatedExpr = null; + + if (isMuxEnabled) { + // Insert Project Operator with new column that will be a hash for HashToRandomExchange fields + final List<DistributionField> distFields = getFields(); + final List<String> outputFieldNames = Lists.newArrayList(childFields); + final RexBuilder rexBuilder = getCluster().getRexBuilder(); + final List<RelDataTypeField> childRowTypeFields = child.getRowType().getFieldList(); + + final HashPrelUtil.HashExpressionCreatorHelper<RexNode> hashHelper = + new HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder); + + final List<RexNode> distFieldRefs = Lists.newArrayListWithExpectedSize(distFields.size()); + for (int i=0; i<distFields.size(); i++) { + final int fieldId = distFields.get(i).getFieldId(); + distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(), fieldId)); + } + + final List <RexNode> updatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size()); + removeUpdatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size()); + for (RelDataTypeField field : childRowTypeFields) { + RexNode rex = rexBuilder.makeInputRef(field.getType(), field.getIndex()); + updatedExpr.add(rex); + removeUpdatedExpr.add(rex); + } + + outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME); + final RexNode distSeed = rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); // distribution seed + updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs, distSeed, hashHelper)); + + RelDataType rowType = RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr, outputFieldNames); + + ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), child.getTraitSet(), child, updatedExpr, rowType); + + newPrel = new UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), addColumnprojectPrel.getTraitSet(), + addColumnprojectPrel); + } + + newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(), newPrel, getFields()); + + if (options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) { + HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel; + // Insert a DeMuxExchange to narrow down the number of receivers + newPrel = new UnorderedDeMuxExchangePrel(getCluster(), getTraitSet(), hashExchangePrel, hashExchangePrel.getFields()); + } + + if (isMuxEnabled) { + // remove earlier inserted Project Operator - since it creates issues down the road in HashJoin + RelDataType removeRowType = RexUtil.createStructType(newPrel.getCluster().getTypeFactory(), removeUpdatedExpr, childFields); + + ProjectPrel removeColumnProjectPrel = new ProjectPrel(newPrel.getCluster(), newPrel.getTraitSet(), + newPrel, removeUpdatedExpr, removeRowType); + return removeColumnProjectPrel; + } + return newPrel; + } + @Override public SelectionVectorMode getEncoding() { return SelectionVectorMode.NONE; http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java new file mode 100644 index 0000000..de9ff25 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedMuxExchangePrel.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import java.io.IOException; +import java.util.List; + +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.OrderedMuxExchange; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.linq4j.Ord; + + +/** + * OrderedMuxExchangePrel is mux exchange created to multiplex the streams for a MergeReceiver. + */ +public class OrderedMuxExchangePrel extends ExchangePrel { + private final RelCollation fieldCollation; + + public OrderedMuxExchangePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation) { + super(cluster, traits, child); + this.fieldCollation = collation; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new OrderedMuxExchangePrel(getCluster(), traitSet, sole(inputs), fieldCollation); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getInput(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + OrderedMuxExchange p = new OrderedMuxExchange(childPOP, PrelUtil.getOrdering(fieldCollation, getInput().getRowType())); + return creator.addMetadata(this, p); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + for (Ord<RelFieldCollation> ord : Ord.zip(this.fieldCollation.getFieldCollations())) { + pw.item("sort" + ord.i, ord.e); + } + return pw; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 74ef601..1271ff9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -66,6 +66,7 @@ public class PlannerSettings implements Context{ public static final OptionValidator NLJOIN_FOR_SCALAR = new BooleanValidator("planner.enable_nljoin_for_scalar_only"); public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE); public static final OptionValidator MUX_EXCHANGE = new BooleanValidator("planner.enable_mux_exchange"); + public static final OptionValidator ORDERED_MUX_EXCHANGE = new BooleanValidator("planner.enable_ordered_mux_exchange"); public static final OptionValidator DEMUX_EXCHANGE = new BooleanValidator("planner.enable_demux_exchange"); public static final OptionValidator PARTITION_SENDER_THREADS_FACTOR = new LongValidator("planner.partitioner_sender_threads_factor"); public static final OptionValidator PARTITION_SENDER_MAX_THREADS = new LongValidator("planner.partitioner_sender_max_threads"); http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java index 26f7074..a03c2f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java @@ -36,6 +36,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.drill.exec.server.options.OptionManager; public class SingleMergeExchangePrel extends ExchangePrel { @@ -93,6 +94,22 @@ public class SingleMergeExchangePrel extends ExchangePrel { return creator.addMetadata(this, g); } + /** + * This method creates a new OrderedMux exchange if mux operators are enabled. + * @param child input to the new muxPrel or new SingleMergeExchange node. + * @param options options manager to check if mux is enabled. + */ + @Override + public Prel constructMuxPrel(Prel child, OptionManager options) throws RuntimeException { + Prel outPrel = child; + if (options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val && + options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val) { + outPrel = new OrderedMuxExchangePrel(getCluster(), getTraitSet(), getInput(), getCollation()); + } + + return new SingleMergeExchangePrel(getCluster(), getTraitSet(), outPrel, getCollation()); + } + @Override public RelWriter explainTerms(RelWriter pw) { super.explainTerms(pw); @@ -106,6 +123,10 @@ public class SingleMergeExchangePrel extends ExchangePrel { return pw; } + public RelCollation getCollation() { + return this.collation; + } + @Override public SelectionVectorMode getEncoding() { return SelectionVectorMode.NONE; http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java index fba4cbe..3d6e0be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java @@ -20,133 +20,38 @@ package org.apache.drill.exec.planner.physical.visitor; import com.google.common.collect.Lists; import org.apache.drill.exec.planner.physical.ExchangePrel; -import org.apache.drill.exec.planner.physical.HashPrelUtil; -import org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper; -import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.Prel; -import org.apache.drill.exec.planner.physical.ProjectPrel; -import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; -import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel; -import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel; -import org.apache.drill.exec.planner.sql.DrillSqlOperator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexUtil; - -import java.math.BigDecimal; -import java.util.Collections; import java.util.List; public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> { - private final boolean isMuxEnabled; - private final boolean isDeMuxEnabled; - - - public static class RexNodeBasedHashExpressionCreatorHelper implements HashExpressionCreatorHelper<RexNode> { - private final RexBuilder rexBuilder; + private final OptionManager options; - public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) { - this.rexBuilder = rexBuilder; - } - - @Override - public RexNode createCall(String funcName, List<RexNode> inputFields) { - final DrillSqlOperator op = - new DrillSqlOperator(funcName, inputFields.size(), true, false); - return rexBuilder.makeCall(op, inputFields); - } + private static boolean isMuxEnabled(OptionManager options) { + return (options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val || + options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val || + options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val); } public static Prel insertLocalExchanges(Prel prel, OptionManager options) { - boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val; - boolean isDeMuxEnabled = options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val; - if (isMuxEnabled || isDeMuxEnabled) { - return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, isDeMuxEnabled), null); + if (isMuxEnabled(options)) { + return prel.accept(new InsertLocalExchangeVisitor(options), null); } return prel; } - public InsertLocalExchangeVisitor(boolean isMuxEnabled, boolean isDeMuxEnabled) { - this.isMuxEnabled = isMuxEnabled; - this.isDeMuxEnabled = isDeMuxEnabled; + public InsertLocalExchangeVisitor(OptionManager options) { + this.options = options; } @Override public Prel visitExchange(ExchangePrel prel, Void value) throws RuntimeException { Prel child = ((Prel)prel.getInput()).accept(this, null); - // Whenever we encounter a HashToRandomExchangePrel - // If MuxExchange is enabled, insert a UnorderedMuxExchangePrel before HashToRandomExchangePrel. - // If DeMuxExchange is enabled, insert a UnorderedDeMuxExchangePrel after HashToRandomExchangePrel. - if (!(prel instanceof HashToRandomExchangePrel)) { - return (Prel)prel.copy(prel.getTraitSet(), Collections.singletonList(((RelNode)child))); - } - - Prel newPrel = child; - - final HashToRandomExchangePrel hashPrel = (HashToRandomExchangePrel) prel; - final List<String> childFields = child.getRowType().getFieldNames(); - - List <RexNode> removeUpdatedExpr = null; - - if (isMuxEnabled) { - // Insert Project Operator with new column that will be a hash for HashToRandomExchange fields - final List<DistributionField> distFields = hashPrel.getFields(); - final List<String> outputFieldNames = Lists.newArrayList(childFields); - final RexBuilder rexBuilder = prel.getCluster().getRexBuilder(); - final List<RelDataTypeField> childRowTypeFields = child.getRowType().getFieldList(); - - final HashExpressionCreatorHelper<RexNode> hashHelper = new RexNodeBasedHashExpressionCreatorHelper(rexBuilder); - final List<RexNode> distFieldRefs = Lists.newArrayListWithExpectedSize(distFields.size()); - for(int i=0; i<distFields.size(); i++) { - final int fieldId = distFields.get(i).getFieldId(); - distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(), fieldId)); - } - - final List <RexNode> updatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size()); - removeUpdatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size()); - for ( RelDataTypeField field : childRowTypeFields) { - RexNode rex = rexBuilder.makeInputRef(field.getType(), field.getIndex()); - updatedExpr.add(rex); - removeUpdatedExpr.add(rex); - } - - outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME); - final RexNode distSeed = rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); // distribution seed - updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs, distSeed, hashHelper)); - - RelDataType rowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), updatedExpr, outputFieldNames); - - ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), child.getTraitSet(), child, updatedExpr, rowType); - - newPrel = new UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), addColumnprojectPrel.getTraitSet(), - addColumnprojectPrel); - } - - newPrel = new HashToRandomExchangePrel(prel.getCluster(), - prel.getTraitSet(), newPrel, ((HashToRandomExchangePrel) prel).getFields()); - - if (isDeMuxEnabled) { - HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel; - // Insert a DeMuxExchange to narrow down the number of receivers - newPrel = new UnorderedDeMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), hashExchangePrel, - hashExchangePrel.getFields()); - } - - if ( isMuxEnabled ) { - // remove earlier inserted Project Operator - since it creates issues down the road in HashJoin - RelDataType removeRowType = RexUtil.createStructType(newPrel.getCluster().getTypeFactory(), removeUpdatedExpr, childFields); - - ProjectPrel removeColumnProjectPrel = new ProjectPrel(newPrel.getCluster(), newPrel.getTraitSet(), newPrel, removeUpdatedExpr, removeRowType); - return removeColumnProjectPrel; - } - return newPrel; + return prel.constructMuxPrel(child, options); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java index b3a25c1..0e4d05c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java @@ -35,7 +35,6 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP; public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP; public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP; - public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP; } http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 2b170e7..d18acb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -91,6 +91,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(PlannerSettings.NLJOIN_FOR_SCALAR), new OptionDefinition(PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR), new OptionDefinition(PlannerSettings.MUX_EXCHANGE), + new OptionDefinition(PlannerSettings.ORDERED_MUX_EXCHANGE), new OptionDefinition(PlannerSettings.DEMUX_EXCHANGE), new OptionDefinition(PlannerSettings.PRODUCER_CONSUMER), new OptionDefinition(PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE), http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 39320c2..2305a30 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -479,6 +479,7 @@ drill.exec.options: { planner.enable_mergejoin: true, planner.enable_multiphase_agg: true, planner.enable_mux_exchange: true, + planner.enable_ordered_mux_exchange: true, planner.enable_nestedloopjoin: true, planner.enable_nljoin_for_scalar_only: true, planner.enable_streamagg: true, http://git-wip-us.apache.org/repos/asf/drill/blob/4f203ea9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java new file mode 100644 index 0000000..cee36fa --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClientFixture; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.PrintWriter; +import java.nio.file.Paths; + +import static org.junit.Assert.assertTrue; + +public class TestOrderedMuxExchange extends PlanTestBase { + + private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange"; + private final static String TOPN = "TopN"; + private final static int NUM_DEPTS = 40; + private final static int NUM_EMPLOYEES = 1000; + private final static int NUM_MNGRS = 1; + private final static int NUM_IDS = 1; + private final static String MATCH_PATTERN_ACROSS_LINES = "((?s).*[\\n\\r].*)"; + private static final String EMPT_TABLE = "empTable"; + + /** + * Generate data for two tables. Each table consists of several JSON files. + */ + @BeforeClass + public static void generateTestDataAndQueries() throws Exception { + // Table consists of two columns "emp_id", "emp_name" and "dept_id" + final File empTableLocation = dirTestWatcher.makeRootSubDir(Paths.get(EMPT_TABLE)); + + // Write 100 records for each new file + final int empNumRecsPerFile = 100; + for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; fileIndex++) { + File file = new File(empTableLocation, fileIndex + ".json"); + PrintWriter printWriter = new PrintWriter(file); + for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < (fileIndex+1)*empNumRecsPerFile; recordIndex++) { + String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d, \"mng_id\" : %d, \"some_id\" : %d }", + recordIndex, recordIndex, recordIndex % NUM_DEPTS, recordIndex % NUM_MNGRS, recordIndex % NUM_IDS); + printWriter.println(record); + } + printWriter.close(); + } + } + + /** + * Test case to verify the OrderedMuxExchange created for order by clause. + * It checks by forcing the plan to create OrderedMuxExchange and also verifies the + * output column is ordered. + * + * @throws Exception if anything goes wrong + */ + + @Test + public void testOrderedMuxForOrderBy() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .maxParallelization(1) + .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client.alterSession(ExecConstants.SLICE_TARGET, 10); + String sql = "SELECT emp_id, emp_name FROM dfs.`empTable` e order BY emp_name, emp_id"; + client.testBuilder() + .unOrdered() + .optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 10;") + .sqlQuery(sql) + .optionSettingQueriesForBaseline("alter session set `planner.enable_ordered_mux_exchange` = false") // Use default option setting. + .sqlBaselineQuery(sql) + .build() + .run(); + client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true); + String explainText = client.queryBuilder().sql(sql).explainText(); + assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE)); + } + } + + /** + * Test case to verify the OrderedMuxExchange created for window functions. + * It checks by forcing the plan to create OrderedMuxExchange and also verifies the + * output column is ordered. + * + * @throws Exception if anything goes wrong + */ + + @Test + public void testOrderedMuxForWindowAgg() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .maxParallelization(1) + .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client.alterSession(ExecConstants.SLICE_TARGET, 10); + String sql = "SELECT emp_name, max(emp_id) over (order by emp_name) FROM dfs.`empTable` e order BY emp_name"; + client.testBuilder() + .unOrdered() + .optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 10;") + .sqlQuery(sql) + .optionSettingQueriesForBaseline("alter session set `planner.enable_ordered_mux_exchange` = false") // Use default option setting. + .sqlBaselineQuery(sql) + .build() + .run(); + client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true); + String explainText = client.queryBuilder().sql(sql).explainText(); + assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE)); + } + } + + + /** + * Test case to verify the OrderedMuxExchange created for order by with limit. + * It checks that the limit is pushed down the OrderedMuxExchange. + * + * @throws Exception if anything goes wrong + */ + @Test + public void testLimitOnOrderedMux() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .maxParallelization(1) + .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client.alterSession(ExecConstants.SLICE_TARGET, 10); + String sql = "SELECT emp_id, emp_name FROM dfs.`empTable` e order BY emp_name, emp_id limit 10"; + client.testBuilder() + .unOrdered() + .optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 10;") + .sqlQuery(sql) + .optionSettingQueriesForBaseline("alter session set `planner.enable_ordered_mux_exchange` = false") // Use default option setting. + .sqlBaselineQuery(sql) + .build() + .run(); + client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true); + String explainText = client.queryBuilder().sql(sql).explainText(); + assertTrue(explainText.matches(String.format(MATCH_PATTERN_ACROSS_LINES + "%s"+ + MATCH_PATTERN_ACROSS_LINES +"%s"+MATCH_PATTERN_ACROSS_LINES,ORDERED_MUX_EXCHANGE, TOPN))); + } + } + + /** + * Test case to verify the OrderedMuxExchange created for order by with limit and window agg. + * It checks that the limit is pushed down the OrderedMuxExchange. + * + * @throws Exception if anything goes wrong + */ + @Test + public void testLimitOnOrderedMuxWindow() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .maxParallelization(1) + .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client.alterSession(ExecConstants.SLICE_TARGET, 10); + String sql = "SELECT emp_name, max(emp_id) over (order by emp_name) FROM dfs.`empTable` e order BY emp_name limit 10"; + client.testBuilder() + .unOrdered() + .optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 10;") + .sqlQuery(sql) + .optionSettingQueriesForBaseline("alter session set `planner.enable_ordered_mux_exchange` = false") // Use default option setting. + .sqlBaselineQuery(sql) + .build() + .run(); + client.alterSession(ExecConstants.ORDERED_MUX_EXCHANGE, true); + String explainText = client.queryBuilder().sql(sql).explainText(); + assertTrue(explainText.matches(String.format(MATCH_PATTERN_ACROSS_LINES + "%s"+ + MATCH_PATTERN_ACROSS_LINES +"%s"+MATCH_PATTERN_ACROSS_LINES,ORDERED_MUX_EXCHANGE, TOPN))); + } + } +}