Repository: asterixdb Updated Branches: refs/heads/master e66346a34 -> 7a4b5681f
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7a4b5681/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java index fe11f64..54e577f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java @@ -18,29 +18,15 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.physical; -import java.util.ArrayList; import java.util.List; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.ListSet; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; -import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; @@ -51,16 +37,15 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor; -public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator { - - private List<LogicalVariable> columnList; +public class PreSortedDistinctByPOperator extends AbstractPreSortedDistinctByPOperator { public PreSortedDistinctByPOperator(List<LogicalVariable> columnList) { - this.columnList = columnList; + super(columnList); } - public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) { - this.columnList = distinctByColumns; + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY; } @Override @@ -69,66 +54,22 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator { } @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty(); - List<ILocalStructuralProperty> propsLocal = op2.getDeliveredPhysicalProperties().getLocalProperties(); - deliveredProperties = new StructuralPropertiesVector(pp, propsLocal); - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; - List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(); - List<OrderColumn> orderColumns = new ArrayList<OrderColumn>(); - for (LogicalVariable column : columnList) { - orderColumns.add(new OrderColumn(column, OrderKind.ASC)); - } - localProps.add(new LocalOrderProperty(orderColumns)); - IPartitioningProperty pp = null; - AbstractLogicalOperator aop = (AbstractLogicalOperator) op; - if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) { - pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), - context.getComputationNodeDomain()); - } - pv[0] = new StructuralPropertiesVector(pp, localProps); - return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); - } - - @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { IOperatorDescriptorRegistry spec = builder.getJobSpec(); - int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]); - int sz = inputSchemas[0].getSize(); - int fdSz = sz - columnList.size(); - int[] fdColumns = new int[fdSz]; - int j = 0; - for (LogicalVariable v : inputSchemas[0]) { - if (!columnList.contains(v)) { - fdColumns[j++] = inputSchemas[0].findVariable(v); - } - } - int[] keysAndDecs = new int[keys.length + fdColumns.length]; - for (int i = 0; i < keys.length; i++) { - keysAndDecs[i] = keys[i]; - } - for (int i = 0; i < fdColumns.length; i++) { - keysAndDecs[i + keys.length] = fdColumns[i]; - } + int[] keysAndDecs = getKeysAndDecs(inputSchemas[0]); IBinaryComparatorFactory[] comparatorFactories = JobGenHelper .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context); IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {}; - IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories, - keysAndDecs); + IAggregatorDescriptorFactory aggregatorFactory = + new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories, keysAndDecs); - RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, - context); - /** make fd columns part of the key but the comparator only compares the distinct key columns */ + RecordDescriptor recordDescriptor = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + /* make fd columns part of the key but the comparator only compares the distinct key columns */ PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs, comparatorFactories, aggregatorFactory, recordDescriptor); @@ -137,14 +78,4 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator { ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); } - - @Override - public PhysicalOperatorTag getOperatorTag() { - return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY; - } - - @Override - public boolean expensiveThanMaterialization() { - return true; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7a4b5681/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java index a403211..2870074 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java @@ -32,6 +32,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; @@ -59,7 +60,7 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue(); afce.setFunctionInfo(aei.newFunInfo); afce.getArguments().clear(); - afce.getArguments().add(new MutableObject<ILogicalExpression>(sai.stepOneResult)); + afce.getArguments().add(new MutableObject<>(sai.stepOneResult)); } } } @@ -68,9 +69,6 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite GroupByOperator newGbyOp, Set<SimilarAggregatesInfo> toReplaceSet, IOptimizationContext context) throws AlgebricksException { - ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>(); - ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>(); - List<LogicalVariable> initVars = initAgg.getVariables(); List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions(); int numExprs = initVars.size(); @@ -79,20 +77,22 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite for (int i = 0; i < numExprs; i++) { AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) initExprs.get(i).getValue(); if (!aggFun.isTwoStep()) { - return new Pair<Boolean, Mutable<ILogicalOperator>>(false, null); + return new Pair<>(false, null); } } + ArrayList<LogicalVariable> pushedVars = new ArrayList<>(); + ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<>(); + boolean haveAggToReplace = false; for (int i = 0; i < numExprs; i++) { Mutable<ILogicalExpression> expRef = initExprs.get(i); AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue(); IFunctionInfo fi1 = aggFun.getStepOneAggregate(); // Clone the aggregate's args. - List<Mutable<ILogicalExpression>> newArgs = new ArrayList<Mutable<ILogicalExpression>>(aggFun - .getArguments().size()); + List<Mutable<ILogicalExpression>> newArgs = new ArrayList<>(aggFun.getArguments().size()); for (Mutable<ILogicalExpression> er : aggFun.getArguments()) { - newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression())); + newArgs.add(new MutableObject<>(er.getValue().cloneExpression())); } IFunctionInfo fi2 = aggFun.getStepTwoAggregate(); @@ -100,10 +100,10 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite LogicalVariable newAggVar = context.newVar(); pushedVars.add(newAggVar); inf.stepOneResult = new VariableReferenceExpression(newAggVar); - inf.simAggs = new ArrayList<AggregateExprInfo>(); + inf.simAggs = new ArrayList<>(); toReplaceSet.add(inf); AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs); - pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal)); + pushedExprs.add(new MutableObject<>(aggLocal)); AggregateExprInfo aei = new AggregateExprInfo(); aei.aggExprRef = expRef; aei.newFunInfo = fi2; @@ -118,34 +118,43 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite if (newGbyOp != null) { // Cut and paste nested input pipelines of initAgg to pushedAgg's input Mutable<ILogicalOperator> inputRef = initAgg.getInputs().get(0); + if (!isPushableInput(inputRef.getValue())) { + return new Pair<>(false, null); + } Mutable<ILogicalOperator> bottomRef = inputRef; while (bottomRef.getValue().getInputs().size() > 0) { bottomRef = bottomRef.getValue().getInputs().get(0); + if (!isPushableInput(bottomRef.getValue())) { + return new Pair<>(false, null); + } } ILogicalOperator oldNts = bottomRef.getValue(); initAgg.getInputs().clear(); - initAgg.getInputs().add(new MutableObject<ILogicalOperator>(oldNts)); + initAgg.getInputs().add(new MutableObject<>(oldNts)); // Hook up the nested aggregate op with the outer group by. - NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>( - newGbyOp)); + NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<>(newGbyOp)); nts.setExecutionMode(ExecutionMode.LOCAL); bottomRef.setValue(nts); pushedAgg.getInputs().add(inputRef); } else { // The local aggregate operator is fed by the input of the original aggregate operator. - pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue())); + pushedAgg.getInputs().add(new MutableObject<>(initAgg.getInputs().get(0).getValue())); // Reintroduce assign op for the global agg partitioning var. initAgg.getInputs().get(0).setValue(pushedAgg); pushedAgg.setGlobal(false); context.computeAndSetTypeEnvironmentForOperator(pushedAgg); } - return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg)); + return new Pair<>(true, new MutableObject<ILogicalOperator>(pushedAgg)); } else { - return new Pair<Boolean, Mutable<ILogicalOperator>>(haveAggToReplace, null); + return new Pair<>(haveAggToReplace, null); } } + protected boolean isPushableInput(ILogicalOperator op) { + return op.getOperatorTag() != LogicalOperatorTag.DISTINCT; + } + protected class SimilarAggregatesInfo { ILogicalExpression stepOneResult; List<AggregateExprInfo> simAggs; @@ -157,6 +166,6 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite } protected class BookkeepingInfo { - Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<GroupByOperator, List<LogicalVariable>>(); + Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<>(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7a4b5681/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 16ed9cb..84961d6 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -65,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsert import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; @@ -135,7 +136,12 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule } case DISTINCT: { DistinctOperator distinct = (DistinctOperator) op; - distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList())); + if (topLevelOp) { + distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList())); + } else { + distinct.setPhysicalOperator( + new MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList())); + } break; } case EMPTYTUPLESOURCE: {
