http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SubplanOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SubplanOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SubplanOperator.java index b805761..72150a9 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SubplanOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SubplanOperator.java @@ -31,12 +31,8 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; -import org.apache.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy; import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; -import org.apache.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; -import org.apache.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer; -import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -94,21 +90,6 @@ public class SubplanOperator extends AbstractOperatorWithNestedPlans { @Override public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { - int n = 0; - for (ILogicalPlan p : nestedPlans) { - n += p.getRoots().size(); - } - ITypeEnvPointer[] envPointers = new ITypeEnvPointer[n + 1]; - envPointers[0] = new OpRefTypeEnvPointer(inputs.get(0), ctx); - int i = 1; - for (ILogicalPlan p : nestedPlans) { - for (Mutable<ILogicalOperator> r : p.getRoots()) { - envPointers[i] = new OpRefTypeEnvPointer(r, ctx); - i++; - } - } - return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMissableTypeComputer(), - ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers); + return createNestedPlansPropagatingTypeEnvironment(ctx, true); } - }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java index aa1791f..e09b471 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java @@ -20,12 +20,15 @@ package org.apache.hyracks.algebricks.core.algebra.operators.logical; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; @@ -39,22 +42,54 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisit * <ul> * <li>{@link #partitionExpressions} - define how input data must be partitioned</li> * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li> - * <li>{@link #expressions} - window function expressions (running aggregates)</li> + * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion</li> + * <li>{@link #frameStartExpressions} - frame start boundary</li> + * <li>{@link #frameEndExpressions} - frame end boundary</li> + * <li>{@link #frameExcludeExpressions} - define values to be excluded from the frame</li> + * <li>{@link #frameOffset} - sets how many tuples to skip inside each frame</li> + * <li>{@link #frameMaxObjects} - limits number of tuples to be returned for each frame</li> * <li>{@link #variables} - output variables containing return values of these functions</li> + * <li>{@link #expressions} - window function expressions (running aggregates)</li> * </ul> * * Window operator does not change cardinality of the input stream. */ -public class WindowOperator extends AbstractAssignOperator { +public class WindowOperator extends AbstractOperatorWithNestedPlans { private final List<Mutable<ILogicalExpression>> partitionExpressions; private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions; + private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExpressions; + + private final List<Mutable<ILogicalExpression>> frameStartExpressions; + + private final List<Mutable<ILogicalExpression>> frameEndExpressions; + + private final List<Mutable<ILogicalExpression>> frameExcludeExpressions; + + private final int frameExcludeNegationStartIdx; + + private final Mutable<ILogicalExpression> frameOffset; + + private int frameMaxObjects; + + private final List<LogicalVariable> variables; + + private final List<Mutable<ILogicalExpression>> expressions; + + public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions, + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions) { + this(partitionExpressions, orderExpressions, null, null, null, null, -1, null, -1); + } + public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions, List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions, - List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) { - super(variables, expressions); + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExpressions, + List<Mutable<ILogicalExpression>> frameStartExpressions, + List<Mutable<ILogicalExpression>> frameEndExpressions, + List<Mutable<ILogicalExpression>> frameExcludeExpressions, int frameExcludeNegationStartIdx, + ILogicalExpression frameOffset, int frameMaxObjects) { this.partitionExpressions = new ArrayList<>(); if (partitionExpressions != null) { this.partitionExpressions.addAll(partitionExpressions); @@ -63,6 +98,48 @@ public class WindowOperator extends AbstractAssignOperator { if (orderExpressions != null) { this.orderExpressions.addAll(orderExpressions); } + this.frameValueExpressions = new ArrayList<>(); + if (frameValueExpressions != null) { + this.frameValueExpressions.addAll(frameValueExpressions); + } + this.frameStartExpressions = new ArrayList<>(); + if (frameStartExpressions != null) { + this.frameStartExpressions.addAll(frameStartExpressions); + } + this.frameEndExpressions = new ArrayList<>(); + if (frameEndExpressions != null) { + this.frameEndExpressions.addAll(frameEndExpressions); + } + this.frameExcludeExpressions = new ArrayList<>(); + if (frameExcludeExpressions != null) { + this.frameExcludeExpressions.addAll(frameExcludeExpressions); + } + this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx; + this.frameOffset = new MutableObject<>(frameOffset); + this.variables = new ArrayList<>(); + this.expressions = new ArrayList<>(); + setFrameMaxObjects(frameMaxObjects); + } + + public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions, + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions, + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExpressions, + List<Mutable<ILogicalExpression>> frameStartExpressions, + List<Mutable<ILogicalExpression>> frameEndExpressions, + List<Mutable<ILogicalExpression>> frameExcludeExpressions, int frameExcludeNegationStartIdx, + ILogicalExpression frameOffset, int frameMaxObjects, List<LogicalVariable> variables, + List<Mutable<ILogicalExpression>> expressions, List<ILogicalPlan> nestedPlans) { + this(partitionExpressions, orderExpressions, frameValueExpressions, frameStartExpressions, frameEndExpressions, + frameExcludeExpressions, frameExcludeNegationStartIdx, frameOffset, frameMaxObjects); + if (variables != null) { + this.variables.addAll(variables); + } + if (expressions != null) { + this.expressions.addAll(expressions); + } + if (nestedPlans != null) { + this.nestedPlans.addAll(nestedPlans); + } } @Override @@ -70,12 +147,63 @@ public class WindowOperator extends AbstractAssignOperator { return LogicalOperatorTag.WINDOW; } + public List<Mutable<ILogicalExpression>> getPartitionExpressions() { + return partitionExpressions; + } + public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getOrderExpressions() { return orderExpressions; } - public List<Mutable<ILogicalExpression>> getPartitionExpressions() { - return partitionExpressions; + public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getFrameValueExpressions() { + return frameValueExpressions; + } + + public List<Mutable<ILogicalExpression>> getFrameStartExpressions() { + return frameStartExpressions; + } + + public List<Mutable<ILogicalExpression>> getFrameEndExpressions() { + return frameEndExpressions; + } + + public List<Mutable<ILogicalExpression>> getFrameExcludeExpressions() { + return frameExcludeExpressions; + } + + public int getFrameExcludeNegationStartIdx() { + return frameExcludeNegationStartIdx; + } + + public Mutable<ILogicalExpression> getFrameOffset() { + return frameOffset; + } + + public int getFrameMaxObjects() { + return frameMaxObjects; + } + + public void setFrameMaxObjects(int value) { + frameMaxObjects = Math.max(-1, value); + } + + public List<LogicalVariable> getVariables() { + return variables; + } + + public List<Mutable<ILogicalExpression>> getExpressions() { + return expressions; + } + + @Override + public boolean hasNestedPlans() { + return !nestedPlans.isEmpty(); + } + + @Override + public void recomputeSchema() { + super.recomputeSchema(); + schema.addAll(variables); } @Override @@ -85,19 +213,45 @@ public class WindowOperator extends AbstractAssignOperator { @Override public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { - boolean mod = super.acceptExpressionTransform(visitor); + boolean mod = false; for (Mutable<ILogicalExpression> expr : partitionExpressions) { mod |= visitor.transform(expr); } for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) { mod |= visitor.transform(p.second); } + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) { + mod |= visitor.transform(p.second); + } + for (Mutable<ILogicalExpression> expr : frameStartExpressions) { + mod |= visitor.transform(expr); + } + for (Mutable<ILogicalExpression> expr : frameEndExpressions) { + mod |= visitor.transform(expr); + } + for (Mutable<ILogicalExpression> excludeExpr : frameExcludeExpressions) { + mod |= visitor.transform(excludeExpr); + } + if (frameOffset.getValue() != null) { + mod |= visitor.transform(frameOffset); + } + for (Mutable<ILogicalExpression> expr : expressions) { + mod |= visitor.transform(expr); + } return mod; } @Override public VariablePropagationPolicy getVariablePropagationPolicy() { - return createVariablePropagationPolicy(true); + return new VariablePropagationPolicy() { + @Override + public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) { + target.addAllVariables(sources[0]); + for (LogicalVariable v : variables) { + target.addVariable(v); + } + } + }; } @Override @@ -107,12 +261,44 @@ public class WindowOperator extends AbstractAssignOperator { @Override public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { - IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx); - int n = variables.size(); - for (int i = 0; i < n; i++) { + IVariableTypeEnvironment env = createNestedPlansPropagatingTypeEnvironment(ctx, true); + for (int i = 0, n = variables.size(); i < n; i++) { env.setVarType(variables.get(i), ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(), ctx.getMetadataProvider(), env)); } return env; } + + @Override + public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) { + vars.addAll(variables); + } + + @Override + public void getUsedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) { + for (Mutable<ILogicalExpression> expr : partitionExpressions) { + expr.getValue().getUsedVariables(vars); + } + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) { + p.second.getValue().getUsedVariables(vars); + } + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) { + p.second.getValue().getUsedVariables(vars); + } + for (Mutable<ILogicalExpression> expr : frameStartExpressions) { + expr.getValue().getUsedVariables(vars); + } + for (Mutable<ILogicalExpression> expr : frameEndExpressions) { + expr.getValue().getUsedVariables(vars); + } + for (Mutable<ILogicalExpression> excludeExpr : frameExcludeExpressions) { + excludeExpr.getValue().getUsedVariables(vars); + } + if (frameOffset != null) { + frameOffset.getValue().getUsedVariables(vars); + } + for (Mutable<ILogicalExpression> expr : expressions) { + expr.getValue().getUsedVariables(vars); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index d91a255..764fe49 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -196,7 +196,7 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole return Boolean.FALSE; } LimitOperator limitOpArg = (LimitOperator) copyAndSubstituteVar(op, arg); - if (op.getOffset() != limitOpArg.getOffset()) { + if (!Objects.equals(op.getOffset().getValue(), limitOpArg.getOffset().getValue())) { return Boolean.FALSE; } boolean isomorphic = op.getMaxObjects().getValue().equals(limitOpArg.getMaxObjects().getValue()); @@ -621,21 +621,45 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole public Boolean visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException { AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; if (aop.getOperatorTag() != LogicalOperatorTag.WINDOW) { - return Boolean.FALSE; + return false; } WindowOperator windowOpArg = (WindowOperator) copyAndSubstituteVar(op, arg); if (!VariableUtilities.varListEqualUnordered(op.getPartitionExpressions(), windowOpArg.getPartitionExpressions())) { - return Boolean.FALSE; + return false; } if (!compareIOrderAndExpressions(op.getOrderExpressions(), windowOpArg.getOrderExpressions())) { - return Boolean.FALSE; + return false; + } + if (!compareIOrderAndExpressions(op.getFrameValueExpressions(), windowOpArg.getFrameValueExpressions())) { + return false; + } + if (!compareExpressions(op.getFrameStartExpressions(), windowOpArg.getFrameStartExpressions())) { + return false; + } + if (!compareExpressions(op.getFrameEndExpressions(), windowOpArg.getFrameEndExpressions())) { + return false; + } + if (!compareExpressions(op.getFrameExcludeExpressions(), windowOpArg.getFrameExcludeExpressions())) { + return false; + } + if (op.getFrameExcludeNegationStartIdx() != windowOpArg.getFrameExcludeNegationStartIdx()) { + return false; + } + if (!Objects.equals(op.getFrameOffset().getValue(), windowOpArg.getFrameOffset().getValue())) { + return false; + } + if (op.getFrameMaxObjects() != windowOpArg.getFrameMaxObjects()) { + return false; } if (!VariableUtilities.varListEqualUnordered(getPairList(op.getVariables(), op.getExpressions()), getPairList(windowOpArg.getVariables(), windowOpArg.getExpressions()))) { - return Boolean.FALSE; + return false; } - return Boolean.TRUE; + List<ILogicalPlan> plans = op.getNestedPlans(); + List<ILogicalPlan> plansArg = windowOpArg.getNestedPlans(); + boolean isomorphic = compareSubplans(plans, plansArg); + return isomorphic; } private Boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs, @@ -670,6 +694,19 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole return Boolean.TRUE; } + private boolean compareSubplans(List<ILogicalPlan> plans, List<ILogicalPlan> plansArg) throws AlgebricksException { + int plansSize = plans.size(); + if (plansSize != plansArg.size()) { + return false; + } + for (int i = 0; i < plansSize; i++) { + if (!IsomorphismUtilities.isOperatorIsomorphicPlan(plans.get(i), plansArg.get(i))) { + return false; + } + } + return true; + } + private ILogicalOperator copyAndSubstituteVar(ILogicalOperator op, ILogicalOperator argOp) throws AlgebricksException { ILogicalOperator newOp = OperatorManipulationUtil.deepCopy(argOp); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java index d0aec16..5591053 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java @@ -100,6 +100,7 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito public Void visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException { mapChildren(op, arg); mapVariablesForAbstractAssign(op, arg); + mapVariablesInNestedPlans(op, arg); return null; } @@ -417,8 +418,8 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito } } - private void mapVariablesInNestedPlans(ILogicalOperator opOrigin, ILogicalOperator arg) throws AlgebricksException { - AbstractOperatorWithNestedPlans op = (AbstractOperatorWithNestedPlans) opOrigin; + private void mapVariablesInNestedPlans(AbstractOperatorWithNestedPlans op, ILogicalOperator arg) + throws AlgebricksException { AbstractOperatorWithNestedPlans argOp = (AbstractOperatorWithNestedPlans) arg; List<ILogicalPlan> plans = op.getNestedPlans(); List<ILogicalPlan> plansArg = argOp.getNestedPlans(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index 198ffdc..8ab23c2 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -618,11 +618,24 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getPartitionExpressions()); List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprCopy = deepCopyOrderExpressionReferencePairList(op.getOrderExpressions()); + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprCopy = + deepCopyOrderExpressionReferencePairList(op.getFrameValueExpressions()); + List<Mutable<ILogicalExpression>> frameStartExprCopy = + exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameStartExpressions()); + List<Mutable<ILogicalExpression>> frameEndExprCopy = + exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameEndExpressions()); + List<Mutable<ILogicalExpression>> frameExclusionExprCopy = + exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getFrameExcludeExpressions()); + ILogicalExpression frameOffsetCopy = exprDeepCopyVisitor.deepCopy(op.getFrameOffset().getValue()); List<LogicalVariable> varCopy = deepCopyVariableList(op.getVariables()); List<Mutable<ILogicalExpression>> exprCopy = exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()); - WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, varCopy, exprCopy); + List<ILogicalPlan> nestedPlansCopy = new ArrayList<>(); + WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, frameValueExprCopy, + frameStartExprCopy, frameEndExprCopy, frameExclusionExprCopy, op.getFrameExcludeNegationStartIdx(), + frameOffsetCopy, op.getFrameMaxObjects(), varCopy, exprCopy, nestedPlansCopy); deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); + deepCopyPlanList(op.getNestedPlans(), nestedPlansCopy, opCopy); return opCopy; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java index 0aaa529..5a566ee 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors; +import java.util.List; + import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; @@ -26,7 +28,6 @@ import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableEvalSizeEnvironment; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractAssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; @@ -96,7 +97,7 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I @Override public Void visitAssignOperator(AssignOperator op, IOptimizationContext context) throws AlgebricksException { - visitAssignment(op, context); + visitAssignment(op, op.getExpressions(), context); return null; } @@ -201,13 +202,13 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I @Override public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext context) throws AlgebricksException { - visitAssignment(op, context); + visitAssignment(op, op.getExpressions(), context); return null; } @Override public Void visitWindowOperator(WindowOperator op, IOptimizationContext context) throws AlgebricksException { - visitAssignment(op, context); + visitAssignment(op, op.getExpressions(), context); return null; } @@ -320,7 +321,8 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I return v; } - private void visitAssignment(AbstractAssignOperator op, IOptimizationContext context) throws AlgebricksException { + private void visitAssignment(ILogicalOperator op, List<Mutable<ILogicalExpression>> exprList, + IOptimizationContext context) throws AlgebricksException { LogicalPropertiesVectorImpl v = propagateCardinality(op, context); if (v != null && v.getNumberOfTuples() != null) { IVariableEvalSizeEnvironment varSizeEnv = context.getVariableEvalSizeEnvironment(); @@ -331,7 +333,7 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I if (v0 != null) { long frames0 = v0.getMaxOutputFrames(); long overhead = 0; // added per tuple - for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) { + for (Mutable<ILogicalExpression> exprRef : exprList) { int sz = evalSize.getEvalSize(exprRef.getValue(), varSizeEnv); if (sz == -1) { return; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index e5ca646..cc0879e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -414,12 +414,26 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical deepCopyExpressionRefs(op.getPartitionExpressions(), newPartitionExprs); List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderExprs = deepCopyOrderAndExpression(op.getOrderExpressions()); - - ArrayList<LogicalVariable> newList = new ArrayList<>(); - ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>(); - newList.addAll(op.getVariables()); + List<Pair<IOrder, Mutable<ILogicalExpression>>> newFrameValueExprs = + deepCopyOrderAndExpression(op.getFrameValueExpressions()); + List<Mutable<ILogicalExpression>> newFrameStartExprs = new ArrayList<>(); + deepCopyExpressionRefs(newFrameStartExprs, op.getFrameStartExpressions()); + List<Mutable<ILogicalExpression>> newFrameEndExprs = new ArrayList<>(); + deepCopyExpressionRefs(newFrameEndExprs, op.getFrameEndExpressions()); + List<Mutable<ILogicalExpression>> newFrameExclusionExprs = new ArrayList<>(); + deepCopyExpressionRefs(newFrameExclusionExprs, op.getFrameExcludeExpressions()); + ILogicalExpression newFrameOffset = deepCopyExpressionRef(op.getFrameOffset()).getValue(); + List<LogicalVariable> newVariables = new ArrayList<>(); + deepCopyVars(newVariables, op.getVariables()); + List<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>(); deepCopyExpressionRefs(newExpressions, op.getExpressions()); - - return new WindowOperator(newPartitionExprs, newOrderExprs, newList, newExpressions); + List<ILogicalPlan> newNestedPlans = new ArrayList<>(); + WindowOperator newWinOp = new WindowOperator(newPartitionExprs, newOrderExprs, newFrameValueExprs, + newFrameStartExprs, newFrameEndExprs, newFrameExclusionExprs, op.getFrameExcludeNegationStartIdx(), + newFrameOffset, op.getFrameMaxObjects(), newVariables, newExpressions, newNestedPlans); + for (ILogicalPlan nestedPlan : op.getNestedPlans()) { + newNestedPlans.add(OperatorManipulationUtil.deepCopy(nestedPlan, newWinOp)); + } + return newWinOp; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java index 43b7c80..7084a60 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java @@ -93,6 +93,11 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo @Override public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + for (ILogicalPlan p : op.getNestedPlans()) { + for (Mutable<ILogicalOperator> r : p.getRoots()) { + VariableUtilities.getLiveVariables(r.getValue(), producedVariables); + } + } producedVariables.addAll(op.getVariables()); return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java index 69b17ed..bd90729 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java @@ -346,7 +346,15 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void @Override public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { - standardLayout(op); + for (Mutable<ILogicalOperator> c : op.getInputs()) { + VariableUtilities.getLiveVariables(c.getValue(), schemaVariables); + } + for (ILogicalPlan p : op.getNestedPlans()) { + for (Mutable<ILogicalOperator> r : p.getRoots()) { + VariableUtilities.getLiveVariables(r.getValue(), schemaVariables); + } + } + schemaVariables.addAll(op.getVariables()); return null; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java index 99d3488..e9f82ef 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java @@ -29,6 +29,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestNonMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; @@ -154,11 +155,7 @@ public class SubstituteVariableVisitor throws AlgebricksException { subst(pair.first, pair.second, op.getGroupByList()); subst(pair.first, pair.second, op.getDecorList()); - for (ILogicalPlan p : op.getNestedPlans()) { - for (Mutable<ILogicalOperator> r : p.getRoots()) { - VariableUtilities.substituteVariablesInDescendantsAndSelf(r.getValue(), pair.first, pair.second, ctx); - } - } + substInNestedPlans(pair.first, pair.second, op); substVarTypes(op, pair); return null; } @@ -248,11 +245,7 @@ public class SubstituteVariableVisitor @Override public Void visitSubplanOperator(SubplanOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { - for (ILogicalPlan p : op.getNestedPlans()) { - for (Mutable<ILogicalOperator> r : p.getRoots()) { - VariableUtilities.substituteVariablesInDescendantsAndSelf(r.getValue(), pair.first, pair.second, ctx); - } - } + substInNestedPlans(pair.first, pair.second, op); return null; } @@ -380,6 +373,15 @@ public class SubstituteVariableVisitor } } + private void substInNestedPlans(LogicalVariable v1, LogicalVariable v2, AbstractOperatorWithNestedPlans op) + throws AlgebricksException { + for (ILogicalPlan p : op.getNestedPlans()) { + for (Mutable<ILogicalOperator> r : p.getRoots()) { + VariableUtilities.substituteVariablesInDescendantsAndSelf(r.getValue(), v1, v2, ctx); + } + } + } + private void substAssignVariables(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions, Pair<LogicalVariable, LogicalVariable> pair) { int n = variables.size(); @@ -503,13 +505,30 @@ public class SubstituteVariableVisitor @Override public Void visitWindowOperator(WindowOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { - for (Mutable<ILogicalExpression> pe : op.getPartitionExpressions()) { - pe.getValue().substituteVar(pair.first, pair.second); + for (Mutable<ILogicalExpression> expr : op.getPartitionExpressions()) { + expr.getValue().substituteVar(pair.first, pair.second); } - for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) { - oe.second.getValue().substituteVar(pair.first, pair.second); + for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { + p.second.getValue().substituteVar(pair.first, pair.second); + } + for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getFrameValueExpressions()) { + p.second.getValue().substituteVar(pair.first, pair.second); + } + for (Mutable<ILogicalExpression> expr : op.getFrameStartExpressions()) { + expr.getValue().substituteVar(pair.first, pair.second); + } + for (Mutable<ILogicalExpression> expr : op.getFrameEndExpressions()) { + expr.getValue().substituteVar(pair.first, pair.second); + } + for (Mutable<ILogicalExpression> expr : op.getFrameExcludeExpressions()) { + expr.getValue().substituteVar(pair.first, pair.second); + } + ILogicalExpression frameOffset = op.getFrameOffset().getValue(); + if (frameOffset != null) { + frameOffset.substituteVar(pair.first, pair.second); } substAssignVariables(op.getVariables(), op.getExpressions(), pair); + substInNestedPlans(pair.first, pair.second, op); substVarTypes(op, pair); return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index b4bea84..ae6ab07 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -32,6 +32,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; @@ -184,11 +185,7 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> @Override public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException { - for (ILogicalPlan p : op.getNestedPlans()) { - for (Mutable<ILogicalOperator> r : p.getRoots()) { - VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables); - } - } + visitNestedPlans(op); for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getGroupByList()) { g.second.getValue().getUsedVariables(usedVariables); } @@ -272,11 +269,7 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> @Override public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException { - for (ILogicalPlan p : op.getNestedPlans()) { - for (Mutable<ILogicalOperator> r : p.getRoots()) { - VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables); - } - } + visitNestedPlans(op); return null; } @@ -473,16 +466,41 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> } @Override - public Void visitWindowOperator(WindowOperator op, Void arg) { + public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + visitNestedPlans(op); for (Mutable<ILogicalExpression> exprRef : op.getPartitionExpressions()) { exprRef.getValue().getUsedVariables(usedVariables); } - for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) { - oe.second.getValue().getUsedVariables(usedVariables); + for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { + p.second.getValue().getUsedVariables(usedVariables); + } + for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getFrameValueExpressions()) { + p.second.getValue().getUsedVariables(usedVariables); + } + for (Mutable<ILogicalExpression> exprRef : op.getFrameStartExpressions()) { + exprRef.getValue().getUsedVariables(usedVariables); + } + for (Mutable<ILogicalExpression> exprRef : op.getFrameEndExpressions()) { + exprRef.getValue().getUsedVariables(usedVariables); + } + for (Mutable<ILogicalExpression> exprRef : op.getFrameExcludeExpressions()) { + exprRef.getValue().getUsedVariables(usedVariables); + } + ILogicalExpression frameOffset = op.getFrameOffset().getValue(); + if (frameOffset != null) { + frameOffset.getUsedVariables(usedVariables); } for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) { exprRef.getValue().getUsedVariables(usedVariables); } return null; } + + private void visitNestedPlans(AbstractOperatorWithNestedPlans op) throws AlgebricksException { + for (ILogicalPlan p : op.getNestedPlans()) { + for (Mutable<ILogicalOperator> r : p.getRoots()) { + VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables); + } + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java index 09ee358..e1dd8db 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java @@ -90,7 +90,7 @@ public class RunningAggregatePOperator extends AbstractPhysicalOperator { int[] projectionList = JobGenHelper.projectAllVariables(opSchema); RunningAggregateRuntimeFactory runtime = - new RunningAggregateRuntimeFactory(outColumns, runningAggFuns, projectionList); + new RunningAggregateRuntimeFactory(projectionList, outColumns, runningAggFuns); runtime.setSourceLocation(ragg.getSourceLocation()); // contribute one Asterix framewriter http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java index 7853524..7389d8e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -23,10 +23,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Function; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.ListSet; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -34,6 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; @@ -50,8 +53,15 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.aggrun.WindowRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowMaterializingRuntimeFactory; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.ErrorCode; @@ -114,7 +124,8 @@ public class WindowPOperator extends AbstractPhysicalOperator { for (LogicalVariable pColumn : pcVars) { lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC)); } - List<ILocalStructuralProperty> localProps = Collections.singletonList(new LocalOrderProperty(lopColumns)); + List<ILocalStructuralProperty> localProps = + lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns)); return new PhysicalRequirements( new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) }, @@ -132,34 +143,85 @@ public class WindowPOperator extends AbstractPhysicalOperator { IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { WindowOperator winOp = (WindowOperator) op; - int[] outColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables()); - List<Mutable<ILogicalExpression>> expressions = winOp.getExpressions(); - IRunningAggregateEvaluatorFactory[] winFuncs = new IRunningAggregateEvaluatorFactory[expressions.size()]; - IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); - for (int i = 0; i < winFuncs.length; i++) { - StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) expressions.get(i).getValue(); - winFuncs[i] = expressionRuntimeProvider.createRunningAggregateFunctionFactory(expr, - context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context); - } - - // TODO push projections into the operator - int[] projectionList = JobGenHelper.projectAllVariables(opSchema); - int[] partitionColumnList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns); + int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns); - IBinaryComparatorFactory[] partitionComparatorFactories = JobGenHelper - .variablesToAscBinaryComparatorFactories(partitionColumns, context.getTypeEnvironment(op), context); + IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op); + IBinaryComparatorFactory[] partitionComparatorFactories = + JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context); //TODO not all functions need order comparators - IBinaryComparatorFactory[] orderComparatorFactories = JobGenHelper - .variablesToBinaryComparatorFactories(orderColumns, context.getTypeEnvironment(op), context); + IBinaryComparatorFactory[] orderComparatorFactories = + JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context); + + IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue()); + IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider(); + IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider(); + + IScalarEvaluatorFactory[] frameStartExprEvals = createEvaluatorFactories(winOp.getFrameStartExpressions(), + inputSchemas, inputTypeEnv, exprRuntimeProvider, context); + + IScalarEvaluatorFactory[] frameEndExprEvals = createEvaluatorFactories(winOp.getFrameEndExpressions(), + inputSchemas, inputTypeEnv, exprRuntimeProvider, context); + + Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators = + createEvaluatorAndComparatorFactories(winOp.getFrameValueExpressions(), Pair::getSecond, Pair::getFirst, + inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context); - WindowRuntimeFactory runtime = new WindowRuntimeFactory(outColumns, winFuncs, projectionList, - partitionColumnList, partitionComparatorFactories, partitionMaterialization, orderComparatorFactories); + Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators = + createEvaluatorAndComparatorFactories(winOp.getFrameExcludeExpressions(), v -> v, + v -> OrderOperator.ASC_ORDER, inputSchemas, inputTypeEnv, exprRuntimeProvider, + binaryComparatorFactoryProvider, context); + + IScalarEvaluatorFactory frameOffsetExprEval = null; + ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue(); + if (frameOffsetExpr != null) { + frameOffsetExprEval = + exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context); + } + + int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema); + + int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables()); + + List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions(); + int runningAggExprCount = runningAggExprs.size(); + IRunningAggregateEvaluatorFactory[] runningAggFactories = + new IRunningAggregateEvaluatorFactory[runningAggExprCount]; + for (int i = 0; i < runningAggExprCount; i++) { + StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue(); + runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv, + inputSchemas, context); + } + + AbstractWindowRuntimeFactory runtime; + if (winOp.hasNestedPlans()) { + int opSchemaSizePreSubplans = opSchema.getSize(); + AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context); + int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans; + WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans); + nestedAggFactory.setSourceLocation(winOp.getSourceLocation()); + runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, frameValueExprEvalsAndComparators.first, + frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameEndExprEvals, + frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(), + frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, + context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(), + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, + aggregatorOutputSchemaSize, nestedAggFactory); + } else if (partitionMaterialization) { + runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, + runningAggFactories); + } else { + runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, + runningAggFactories); + } runtime.setSourceLocation(winOp.getSourceLocation()); // contribute one Asterix framewriter - RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context); builder.contributeMicroOperator(winOp, runtime, recDesc); // and contribute one edge from its child ILogicalOperator src = winOp.getInputs().get(0).getValue(); @@ -180,6 +242,44 @@ public class WindowPOperator extends AbstractPhysicalOperator { return partitionMaterialization; } + private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList, + IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv, + IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException { + if (exprList.isEmpty()) { + return null; + } + int ln = exprList.size(); + IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln]; + for (int i = 0; i < ln; i++) { + ILogicalExpression expr = exprList.get(i).getValue(); + evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context); + } + return evals; + } + + private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories( + List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter, + Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas, + IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider, + IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context) + throws AlgebricksException { + if (exprList.isEmpty()) { + return new Pair<>(null, null); + } + int ln = exprList.size(); + IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln]; + IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln]; + for (int i = 0; i < ln; i++) { + T exprObj = exprList.get(i); + ILogicalExpression expr = exprGetter.apply(exprObj).getValue(); + OrderOperator.IOrder order = orderGetter.apply(exprObj); + evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context); + comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr), + order.getKind() == OrderOperator.IOrder.OrderKind.ASC); + } + return new Pair<>(evals, comparators); + } + private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) { for (int i = startIdx, ln = ocList.size(); i < ln; i++) { if (varSet.contains(ocList.get(i).getColumn())) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index ad45614..62b935b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -486,10 +486,57 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("window ").append(str(op.getVariables())).append(" <- "); pprintExprList(op.getExpressions(), indent); - buffer.append(" partition "); - pprintExprList(op.getPartitionExpressions(), indent); - buffer.append(" order "); - pprintOrderList(op.getOrderExpressions(), indent); + if (!op.getPartitionExpressions().isEmpty()) { + buffer.append(" partition "); + pprintExprList(op.getPartitionExpressions(), indent); + } + if (!op.getOrderExpressions().isEmpty()) { + buffer.append(" order "); + pprintOrderList(op.getOrderExpressions(), indent); + } + if (op.hasNestedPlans()) { + buffer.append(" frame on "); + pprintOrderList(op.getFrameValueExpressions(), indent); + buffer.append("start "); + List<Mutable<ILogicalExpression>> frameStartExpressions = op.getFrameStartExpressions(); + if (!frameStartExpressions.isEmpty()) { + pprintExprList(frameStartExpressions, indent); + } else { + buffer.append("unbounded"); + } + buffer.append(" end "); + List<Mutable<ILogicalExpression>> frameEndExpressions = op.getFrameEndExpressions(); + if (!frameEndExpressions.isEmpty()) { + pprintExprList(frameEndExpressions, indent); + } else { + buffer.append("unbounded"); + } + List<Mutable<ILogicalExpression>> frameExcludeExpressions = op.getFrameExcludeExpressions(); + if (!frameExcludeExpressions.isEmpty()) { + buffer.append(" exclude "); + int negStartIdx = op.getFrameExcludeNegationStartIdx(); + if (negStartIdx >= 0 && op.getFrameExcludeNegationStartIdx() < frameExcludeExpressions.size()) { + pprintExprList(frameExcludeExpressions.subList(0, negStartIdx), indent); + buffer.append(" and not "); + pprintExprList(frameExcludeExpressions.subList(negStartIdx, frameExcludeExpressions.size()), + indent); + } else { + pprintExprList(frameExcludeExpressions, indent); + } + } + Mutable<ILogicalExpression> frameOffset = op.getFrameOffset(); + if (frameOffset.getValue() != null) { + buffer.append(" offset "); + buffer.append(frameOffset.getValue().accept(exprVisitor, indent)); + } + int frameMaxObjects = op.getFrameMaxObjects(); + if (frameMaxObjects != -1) { + buffer.append(" maxObjects " + frameMaxObjects); + } + + buffer.append(" {"); + printNestedPlans(op, indent); + } return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index 4c810ab..8ba2aff 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@ -661,9 +661,50 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat addIndent(indent).append("\"partition by\": "); pprintExprList(op.getPartitionExpressions(), indent); } - buffer.append(",\n"); - addIndent(indent).append("\"order by\": "); - pprintOrderExprList(op.getOrderExpressions(), -1, indent); + if (!op.getOrderExpressions().isEmpty()) { + buffer.append(",\n"); + addIndent(indent).append("\"order by\": "); + pprintOrderExprList(op.getOrderExpressions(), -1, indent); + } + if (op.hasNestedPlans()) { + buffer.append(",\n"); + addIndent(indent).append("\"frame on\": "); + pprintOrderExprList(op.getFrameValueExpressions(), -1, indent); + List<Mutable<ILogicalExpression>> frameStartExpressions = op.getFrameStartExpressions(); + if (!frameStartExpressions.isEmpty()) { + buffer.append(",\n"); + addIndent(indent).append("\"frame start\": "); + pprintExprList(frameStartExpressions, indent); + } + List<Mutable<ILogicalExpression>> frameEndExpressions = op.getFrameEndExpressions(); + if (!frameEndExpressions.isEmpty()) { + buffer.append(",\n"); + addIndent(indent).append("\"frame end\": "); + pprintExprList(frameEndExpressions, indent); + } + List<Mutable<ILogicalExpression>> frameExcludeExpressions = op.getFrameExcludeExpressions(); + if (!frameExcludeExpressions.isEmpty()) { + buffer.append(",\n"); + addIndent(indent).append("\"frame exclude\": "); + pprintExprList(frameExcludeExpressions, indent); + addIndent(indent).append("\"frame exclude negation start\": ") + .append(String.valueOf(op.getFrameExcludeNegationStartIdx())); + } + Mutable<ILogicalExpression> frameOffset = op.getFrameOffset(); + if (frameOffset.getValue() != null) { + buffer.append(",\n"); + addIndent(indent).append("\"frame offset\": "); + pprintExpr(frameOffset, indent); + } + int frameMaxObjects = op.getFrameMaxObjects(); + if (frameMaxObjects != -1) { + buffer.append(",\n"); + addIndent(indent).append("\"frame maxObjects\": " + frameMaxObjects); + } + + addIndent(indent).append("\"subplan\": "); + printNestedPlans(op, indent); + } return null; } @@ -693,11 +734,15 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat } else { buffer.append(", "); } - buffer.append(exprRef.getValue().accept(exprVisitor, indent).replace('"', ' ')); + pprintExpr(exprRef, indent); } buffer.append("\""); } + protected void pprintExpr(Mutable<ILogicalExpression> exprRef, Integer indent) throws AlgebricksException { + buffer.append(exprRef.getValue().accept(exprVisitor, indent).replace('"', ' ')); + } + protected void pprintVeList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList, Integer indent) throws AlgebricksException { buffer.append("["); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index ea914fa..4d05a1b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -28,8 +28,9 @@ import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -42,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSo import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor; @@ -215,7 +217,7 @@ public class OperatorManipulationUtil { LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx, true); ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root); - return Pair.of(newRoot, deepCopyVisitor.getInputToOutputVariableMapping()); + return new Pair<>(newRoot, deepCopyVisitor.getInputToOutputVariableMapping()); } private static void setDataSource(ILogicalPlan plan, ILogicalOperator dataSource) { @@ -380,4 +382,29 @@ public class OperatorManipulationUtil { } return -1; } + + public static List<Mutable<ILogicalExpression>> cloneExpressions(List<Mutable<ILogicalExpression>> exprList) { + if (exprList == null) { + return null; + } + List<Mutable<ILogicalExpression>> clonedExprList = new ArrayList<>(exprList.size()); + for (Mutable<ILogicalExpression> expr : exprList) { + clonedExprList.add(new MutableObject<>(expr.getValue().cloneExpression())); + } + return clonedExprList; + } + + public static List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> cloneOrderExpressions( + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprList) { + if (orderExprList == null) { + return null; + } + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> clonedExprList = + new ArrayList<>(orderExprList.size()); + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> orderExpr : orderExprList) { + clonedExprList.add( + new Pair<>(orderExpr.first, new MutableObject<>(orderExpr.second.getValue().cloneExpression()))); + } + return clonedExprList; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java index f10c3a4..f50ca3a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java @@ -188,14 +188,11 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String public String visitOrderOperator(OrderOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("order "); - for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { - if (op.getTopK() != -1) { - stringBuilder.append("(topK: ").append(op.getTopK()).append(") "); - } - stringBuilder.append("("); - appendOrder(p.first); - stringBuilder.append(", ").append(p.second.getValue().toString()).append(") "); + int topK = op.getTopK(); + if (topK != -1) { + stringBuilder.append("(topK: ").append(topK).append(") "); } + printOrderExprList(op.getOrderExpressions()); appendSchema(op, showDetails); appendAnnotations(op, showDetails); appendPhysicalOperatorInfo(op, showDetails); @@ -608,10 +605,36 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String stringBuilder.append(") partition by ("); printExprList(op.getPartitionExpressions()); stringBuilder.append(") order by ("); - for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { - stringBuilder.append("("); - appendOrder(p.first); - stringBuilder.append(", ").append(p.second.getValue().toString()).append(") "); + printOrderExprList(op.getOrderExpressions()); + if (op.hasNestedPlans()) { + stringBuilder.append(") frame on ("); + printOrderExprList(op.getFrameValueExpressions()); + List<Mutable<ILogicalExpression>> frameStartExpressions = op.getFrameStartExpressions(); + if (!frameStartExpressions.isEmpty()) { + stringBuilder.append(") frame start ("); + printExprList(frameStartExpressions); + } + List<Mutable<ILogicalExpression>> frameEndExpressions = op.getFrameEndExpressions(); + if (!frameEndExpressions.isEmpty()) { + stringBuilder.append(") frame end ("); + printExprList(frameEndExpressions); + } + List<Mutable<ILogicalExpression>> frameExcludeExpressions = op.getFrameExcludeExpressions(); + if (!frameExcludeExpressions.isEmpty()) { + stringBuilder.append(") frame exclude ("); + stringBuilder.append(" (negation start: ").append(op.getFrameExcludeNegationStartIdx()).append(") "); + printExprList(frameExcludeExpressions); + } + Mutable<ILogicalExpression> frameOffset = op.getFrameOffset(); + if (frameOffset.getValue() != null) { + stringBuilder.append(") frame offset ("); + stringBuilder.append(frameOffset.getValue()); + stringBuilder.append(") "); + } + int frameMaxObjects = op.getFrameMaxObjects(); + if (frameMaxObjects != -1) { + stringBuilder.append("(frame maxObjects: ").append(frameMaxObjects).append(") "); + } } stringBuilder.append(")"); appendSchema(op, showDetails); @@ -644,6 +667,14 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String stringBuilder.append("]"); } + private void printOrderExprList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprList) { + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExprList) { + stringBuilder.append("("); + appendOrder(p.first); + stringBuilder.append(", ").append(p.second.getValue().toString()).append(") "); + } + } + private void appendSchema(AbstractLogicalOperator op, boolean show) { if (show) { stringBuilder.append("\\nSchema: "); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java index ad0a9da..b8dd24f 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java @@ -92,6 +92,7 @@ public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule { ignoreOps.add(LogicalOperatorTag.PROJECT); ignoreOps.add(LogicalOperatorTag.AGGREGATE); ignoreOps.add(LogicalOperatorTag.RUNNINGAGGREGATE); + ignoreOps.add(LogicalOperatorTag.WINDOW); //TODO: can extract from partition/order/frame expressions } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java index 2c825b7..dc9a11f 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java @@ -19,7 +19,7 @@ package org.apache.hyracks.algebricks.rewriter.rules; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -52,7 +52,7 @@ public class InlineSingleReferenceVariablesRule extends InlineVariablesRule { // Maps from variable to a list of operators using that variable. protected Map<LogicalVariable, List<ILogicalOperator>> usedVarsMap = - new HashMap<LogicalVariable, List<ILogicalOperator>>(); + new LinkedHashMap<LogicalVariable, List<ILogicalOperator>>(); protected List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>(); @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java index 3987380..f906f9b 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java @@ -26,15 +26,40 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; public class IntroduceAggregateCombinerRule extends AbstractIntroduceCombinerRule { @Override + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (context.checkIfInDontApplySet(this, op)) { + return false; + } + // Disable aggregate combiners for nested plans inside window operators + if (op.getOperatorTag() == LogicalOperatorTag.WINDOW) { + WindowOperator winOp = (WindowOperator) op; + if (winOp.hasNestedPlans()) { + for (ILogicalPlan plan : winOp.getNestedPlans()) { + for (Mutable<ILogicalOperator> root : plan.getRoots()) { + ILogicalOperator rootOp = root.getValue(); + if (rootOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE) { + context.addToDontApplySet(this, rootOp); + } + } + } + } + } + return false; + } + + @Override public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java index 3efa46b..8e41a15 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java @@ -31,9 +31,9 @@ import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.ListSet; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -184,11 +184,11 @@ public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule { // Copy the original nested pipeline inside the group-by. Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> copiedAggOpAndVarMap = OperatorManipulationUtil.deepCopyWithNewVars(aggOp, context); - ILogicalOperator newBottomAgg = copiedAggOpAndVarMap.getLeft(); + ILogicalOperator newBottomAgg = copiedAggOpAndVarMap.first; // Substitutes variables in the upper nested pipe line. VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(), - copiedAggOpAndVarMap.getRight(), context); + copiedAggOpAndVarMap.second, context); // Does the actual push. Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
