http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index 3624458..d582bc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; +import java.util.Iterator; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.ExternalSort; @@ -52,17 +53,17 @@ public class SortPrel extends SortRel implements Prel { public RelOptCost computeSelfCost(RelOptPlanner planner) { if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { //We use multiplier 0.05 for TopN operator, and 0.1 for Sort, to make TopN a preferred choice. - return super.computeSelfCost(planner).multiplyBy(.1); + return super.computeSelfCost(planner).multiplyBy(.1); } - + RelNode child = this.getChild(); double inputRows = RelMetadataQuery.getRowCount(child); // int rowWidth = child.getRowType().getPrecision(); int numSortFields = this.collation.getFieldCollations().size(); - double cpuCost = DrillCostBase.COMPARE_CPU_COST * numSortFields * inputRows * (Math.log(inputRows)/Math.log(2)); + double cpuCost = DrillCostBase.COMPARE_CPU_COST * numSortFields * inputRows * (Math.log(inputRows)/Math.log(2)); double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); - return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0); + return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0); } @Override @@ -71,10 +72,6 @@ public class SortPrel extends SortRel implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); -// childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE); -// Sort g = new Sort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); - - childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE); Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); return g; @@ -88,4 +85,24 @@ public class SortPrel extends SortRel implements Prel { RexNode fetch) { return new SortPrel(getCluster(), traitSet, newInput, newCollation); } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; // should support SV2 but there is a bug, DRILL-648 + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.FOUR_BYTE; + } }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java index 0ead488..94b70c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java @@ -38,12 +38,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** - * - * Rule that converts a logical {@link DrillSortRel} to a physical sort. Convert from Logical Sort into Physical Sort. - * For Logical Sort, it requires one single data stream as the output. + * + * Rule that converts a logical {@link DrillSortRel} to a physical sort. Convert from Logical Sort into Physical Sort. + * For Logical Sort, it requires one single data stream as the output. * */ -public class SortPrule extends RelOptRule{ +public class SortPrule extends Prule{ public static final RelOptRule INSTANCE = new SortPrule(); private SortPrule() { @@ -54,19 +54,24 @@ public class SortPrule extends RelOptRule{ public void onMatch(RelOptRuleCall call) { final DrillSortRel sort = (DrillSortRel) call.rel(0); final RelNode input = sort.getChild(); - - // Keep the collation in logical sort. Convert input into a RelNode with 1) this collation, 2) Physical, 3) hash distributed on - DrillDistributionTrait hashDistribution = + // Keep the collation in logical sort. Convert input into a RelNode with 1) this collation, 2) Physical, 3) hash distributed on + + DrillDistributionTrait hashDistribution = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort))); final RelTraitSet traits = sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution); - + final RelNode convertedInput = convert(input, traits); - - RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput, sort.getCollation()); - call.transformTo(exch); // transform logical "sort" into "SingleMergeExchange". - + + if(isSingleMode(call)){ + call.transformTo(convertedInput); + }else{ + RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput, sort.getCollation()); + call.transformTo(exch); // transform logical "sort" into "SingleMergeExchange". + + } + } private List<DistributionField> getDistributionField(DrillSortRel rel) { @@ -75,9 +80,9 @@ public class SortPrule extends RelOptRule{ for (RelFieldCollation relField : rel.getCollation().getFieldCollations()) { DistributionField field = new DistributionField(relField.getFieldIndex()); distFields.add(field); - } - + } + return distFields; } - + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java index 25511aa..5fb758a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.BitSet; +import java.util.Iterator; import java.util.List; import net.hydromatic.linq4j.Ord; @@ -74,8 +75,8 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { - return super.computeSelfCost(planner).multiplyBy(.1); - } + return super.computeSelfCost(planner).multiplyBy(.1); + } RelNode child = this.getChild(); double inputRows = RelMetadataQuery.getRowCount(child); @@ -85,9 +86,9 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{ // add cpu cost for computing the aggregate functions cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows; DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); - return costFactory.makeCost(inputRows, cpuCost, 0 /* disk i/o cost */, 0 /* network cost */); + return costFactory.makeCost(inputRows, cpuCost, 0 /* disk i/o cost */, 0 /* network cost */); } - + @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { final List<String> childFields = getChild().getRowType().getFieldNames(); @@ -125,4 +126,23 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{ return expr; } + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.ALL; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java index 2773bd1..3c8cfe0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java @@ -18,12 +18,14 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; +import java.util.Iterator; import java.util.List; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelNode; import org.eigenbase.rel.RelWriter; @@ -36,7 +38,7 @@ import org.eigenbase.relopt.RelTraitSet; import org.eigenbase.rex.RexLiteral; import org.eigenbase.rex.RexNode; -public class TopNPrel extends SingleRel implements Prel { +public class TopNPrel extends SinglePrel { protected int limit; protected final RelCollation collation; @@ -62,25 +64,25 @@ public class TopNPrel extends SingleRel implements Prel { return topN; } - + /** - * Cost of doing Top-N is proportional to M log N where M is the total number of - * input rows and N is the limit for Top-N. This makes Top-N preferable to Sort - * since cost of full Sort is proportional to M log M . + * Cost of doing Top-N is proportional to M log N where M is the total number of + * input rows and N is the limit for Top-N. This makes Top-N preferable to Sort + * since cost of full Sort is proportional to M log M . */ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { - //We use multiplier 0.05 for TopN operator, and 0.1 for Sort, to make TopN a preferred choice. + if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { + //We use multiplier 0.05 for TopN operator, and 0.1 for Sort, to make TopN a preferred choice. return super.computeSelfCost(planner).multiplyBy(0.05); } RelNode child = this.getChild(); double inputRows = RelMetadataQuery.getRowCount(child); int numSortFields = this.collation.getFieldCollations().size(); - double cpuCost = DrillCostBase.COMPARE_CPU_COST * numSortFields * inputRows * (Math.log(limit)/Math.log(2)); + double cpuCost = DrillCostBase.COMPARE_CPU_COST * numSortFields * inputRows * (Math.log(limit)/Math.log(2)); double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); - return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0); + return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0); } @@ -90,4 +92,14 @@ public class TopNPrel extends SingleRel implements Prel { .item("limit", limit); } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.NONE_AND_TWO; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.FOUR_BYTE; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java index 9d3fc3d..c2cf685 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java @@ -35,37 +35,37 @@ import org.eigenbase.relopt.RelOptCost; import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelTraitSet; -public class UnionExchangePrel extends SingleRel implements Prel { +public class UnionExchangePrel extends SinglePrel { public UnionExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) { super(cluster, traitSet, input); assert input.getConvention() == Prel.DRILL_PHYSICAL; } - /** - * A UnionExchange processes a total of M rows coming from N senders and - * combines them into a single output stream. Note that there is + /** + * A UnionExchange processes a total of M rows coming from N senders and + * combines them into a single output stream. Note that there is * no sort or merge operation going on. For costing purposes, we can - * assume each sender is sending M/N rows to a single receiver. + * assume each sender is sending M/N rows to a single receiver. * (See DrillCostBase for symbol notations) - * C = CPU cost of SV remover for M/N rows - * + Network cost of sending M/N rows to 1 destination. - * So, C = (s * M/N) + (w * M/N) + * C = CPU cost of SV remover for M/N rows + * + Network cost of sending M/N rows to 1 destination. + * So, C = (s * M/N) + (w * M/N) * Total cost = N * C - */ + */ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { - return super.computeSelfCost(planner).multiplyBy(.1); + return super.computeSelfCost(planner).multiplyBy(.1); } - + RelNode child = this.getChild(); double inputRows = RelMetadataQuery.getRowCount(child); - int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH; + int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH; double svrCpuCost = DrillCostBase.SVR_CPU_COST * inputRows; double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth; DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); - return costFactory.makeCost(inputRows, svrCpuCost, 0, networkCost); + return costFactory.makeCost(inputRows, svrCpuCost, 0, networkCost); } @Override @@ -80,11 +80,14 @@ public class UnionExchangePrel extends SingleRel implements Prel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; - //Currently, only accepts "NONE". For other, requires SelectionVectorRemover - childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE); - UnionExchange g = new UnionExchange(childPOP); return g; } + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java index 98a42de..4cefeb5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java @@ -18,11 +18,13 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; +import java.util.Iterator; import java.util.List; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.common.DrillWriterRelBase; import org.apache.drill.exec.planner.logical.CreateTableEntry; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelTraitSet; @@ -44,4 +46,24 @@ public class WriterPrel extends DrillWriterRelBase implements Prel { return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator)); } + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java index 67ae711..42a9984 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java @@ -29,7 +29,7 @@ import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; import org.eigenbase.relopt.volcano.RelSubset; -public class WriterPrule extends RelOptRule{ +public class WriterPrule extends Prule{ public static final RelOptRule INSTANCE = new WriterPrule(); public WriterPrule() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index d107c29..a1443e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.planner.logical.DrillStoreRel; import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.SelectionVectorPrelVisitor; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptUtil; @@ -117,7 +118,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler{ Preconditions.checkArgument(drel.getConvention() == DrillRel.DRILL_LOGICAL); RelTraitSet traits = drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON); Prel phyRelNode = (Prel) planner.transform(DrillSqlWorker.PHYSICAL_MEM_RULES, traits, drel); - return phyRelNode; + return SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode); } protected PhysicalOperator convertToPop(Prel prel) throws IOException{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index ae21a3c..b4da6e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -39,12 +39,12 @@ public class BatchSchema implements Iterable<MaterializedField> { public int getFieldCount(){ return fields.size(); } - + public MaterializedField getColumn(int index){ if(index < 0 || index >= fields.size()) return null; return fields.get(index); } - + @Override public Iterator<MaterializedField> iterator() { return fields.iterator(); @@ -67,6 +67,11 @@ public class BatchSchema implements Iterable<MaterializedField> { SelectionVectorMode(int size, boolean hasSelectionVector) { this.size = size; } + + public static SelectionVectorMode[] DEFAULT = {NONE}; + public static SelectionVectorMode[] NONE_AND_TWO = {NONE, TWO_BYTE}; + public static SelectionVectorMode[] NONE_AND_FOUR = {NONE, FOUR_BYTE}; + public static SelectionVectorMode[] ALL = {NONE, TWO_BYTE, FOUR_BYTE}; } @Override @@ -96,7 +101,7 @@ public class BatchSchema implements Iterable<MaterializedField> { return false; return true; } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java index 63a5727..a11bea6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java @@ -20,7 +20,6 @@ package org.apache.drill; import org.junit.Ignore; import org.junit.Test; -@Ignore // DRILL-648 public class TestTpchSingleMode extends BaseTestQuery{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchSingleMode.class);