Repository: asterixdb Updated Branches: refs/heads/master 8efd847db -> faf9791d2
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index 764fe49..a39af41 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -624,32 +624,10 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole return false; } WindowOperator windowOpArg = (WindowOperator) copyAndSubstituteVar(op, arg); - if (!VariableUtilities.varListEqualUnordered(op.getPartitionExpressions(), - windowOpArg.getPartitionExpressions())) { + if (!compareWindowPartitionSpec(op, windowOpArg)) { return false; } - if (!compareIOrderAndExpressions(op.getOrderExpressions(), windowOpArg.getOrderExpressions())) { - return false; - } - if (!compareIOrderAndExpressions(op.getFrameValueExpressions(), windowOpArg.getFrameValueExpressions())) { - return false; - } - if (!compareExpressions(op.getFrameStartExpressions(), windowOpArg.getFrameStartExpressions())) { - return false; - } - if (!compareExpressions(op.getFrameEndExpressions(), windowOpArg.getFrameEndExpressions())) { - return false; - } - if (!compareExpressions(op.getFrameExcludeExpressions(), windowOpArg.getFrameExcludeExpressions())) { - return false; - } - if (op.getFrameExcludeNegationStartIdx() != windowOpArg.getFrameExcludeNegationStartIdx()) { - return false; - } - if (!Objects.equals(op.getFrameOffset().getValue(), windowOpArg.getFrameOffset().getValue())) { - return false; - } - if (op.getFrameMaxObjects() != windowOpArg.getFrameMaxObjects()) { + if (!compareWindowFrameSpec(op, windowOpArg)) { return false; } if (!VariableUtilities.varListEqualUnordered(getPairList(op.getVariables(), op.getExpressions()), @@ -662,36 +640,52 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole return isomorphic; } - private Boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs, + public static boolean compareWindowPartitionSpec(WindowOperator winOp1, WindowOperator winOp2) { + return VariableUtilities.varListEqualUnordered(winOp1.getPartitionExpressions(), + winOp2.getPartitionExpressions()) + && compareIOrderAndExpressions(winOp1.getOrderExpressions(), winOp2.getOrderExpressions()); + } + + public static boolean compareWindowFrameSpec(WindowOperator winOp1, WindowOperator winOp2) { + return compareIOrderAndExpressions(winOp1.getFrameValueExpressions(), winOp2.getFrameValueExpressions()) + && compareExpressions(winOp1.getFrameStartExpressions(), winOp2.getFrameStartExpressions()) + && compareExpressions(winOp1.getFrameEndExpressions(), winOp2.getFrameEndExpressions()) + && compareExpressions(winOp1.getFrameExcludeExpressions(), winOp2.getFrameExcludeExpressions()) + && winOp1.getFrameExcludeNegationStartIdx() == winOp2.getFrameExcludeNegationStartIdx() + && Objects.equals(winOp1.getFrameOffset().getValue(), winOp2.getFrameOffset().getValue()) + && winOp1.getFrameMaxObjects() == winOp2.getFrameMaxObjects(); + } + + private static boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs, List<Mutable<ILogicalExpression>> argExprs) { if (opExprs.size() != argExprs.size()) { - return Boolean.FALSE; + return false; } for (int i = 0; i < opExprs.size(); i++) { boolean isomorphic = opExprs.get(i).getValue().equals(argExprs.get(i).getValue()); if (!isomorphic) { - return Boolean.FALSE; + return false; } } - return Boolean.TRUE; + return true; } - private Boolean compareIOrderAndExpressions(List<Pair<IOrder, Mutable<ILogicalExpression>>> opOrderExprs, + private static boolean compareIOrderAndExpressions(List<Pair<IOrder, Mutable<ILogicalExpression>>> opOrderExprs, List<Pair<IOrder, Mutable<ILogicalExpression>>> argOrderExprs) { if (opOrderExprs.size() != argOrderExprs.size()) { - return Boolean.FALSE; + return false; } for (int i = 0; i < opOrderExprs.size(); i++) { boolean isomorphic = opOrderExprs.get(i).first.equals(argOrderExprs.get(i).first); if (!isomorphic) { - return Boolean.FALSE; + return false; } isomorphic = opOrderExprs.get(i).second.getValue().equals(argOrderExprs.get(i).second.getValue()); if (!isomorphic) { - return Boolean.FALSE; + return false; } } - return Boolean.TRUE; + return true; } private boolean compareSubplans(List<ILogicalPlan> plans, List<ILogicalPlan> plansArg) throws AlgebricksException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java index 5591053..eca2508 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java @@ -99,7 +99,7 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito @Override public Void visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException { mapChildren(op, arg); - mapVariablesForAbstractAssign(op, arg); + mapVariablesForWindow(op, arg); mapVariablesInNestedPlans(op, arg); return null; } @@ -359,7 +359,7 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito rightOp.getExpressions()); } - private void mapVariablesForGroupBy(ILogicalOperator left, ILogicalOperator right) throws AlgebricksException { + private void mapVariablesForGroupBy(ILogicalOperator left, ILogicalOperator right) { if (left.getOperatorTag() != right.getOperatorTag()) { return; } @@ -373,6 +373,16 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito mapVarExprPairList(leftPairs, rightPairs); } + private void mapVariablesForWindow(ILogicalOperator left, ILogicalOperator right) { + if (left.getOperatorTag() != right.getOperatorTag()) { + return; + } + WindowOperator leftOp = (WindowOperator) left; + WindowOperator rightOp = (WindowOperator) right; + mapVariablesForAbstractAssign(leftOp.getVariables(), leftOp.getExpressions(), rightOp.getVariables(), + rightOp.getExpressions()); + } + private void mapVarExprPairList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> leftPairs, List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rightPairs) { if (leftPairs.size() != rightPairs.size()) { @@ -420,6 +430,9 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito private void mapVariablesInNestedPlans(AbstractOperatorWithNestedPlans op, ILogicalOperator arg) throws AlgebricksException { + if (op.getOperatorTag() != arg.getOperatorTag()) { + return; + } AbstractOperatorWithNestedPlans argOp = (AbstractOperatorWithNestedPlans) arg; List<ILogicalPlan> plans = op.getNestedPlans(); List<ILogicalPlan> plansArg = argOp.getNestedPlans(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index 4d05a1b..967ad6f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -36,6 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; @@ -407,4 +408,25 @@ public class OperatorManipulationUtil { } return clonedExprList; } + + /** + * Finds a variable assigned to a given expression and returns a new {@link VariableReferenceExpression} + * referring to this variable. + * @param assignVarList list of variables + * @param assignExprList list of expressions assigned to those variables + * @param searchExpr expression to search for + * @return said value, {@code null} if a variable is not found + */ + public static VariableReferenceExpression findAssignedVariable(List<LogicalVariable> assignVarList, + List<Mutable<ILogicalExpression>> assignExprList, ILogicalExpression searchExpr) { + for (int i = 0, n = assignExprList.size(); i < n; i++) { + ILogicalExpression expr = assignExprList.get(i).getValue(); + if (expr.equals(searchExpr)) { + VariableReferenceExpression result = new VariableReferenceExpression(assignVarList.get(i)); + result.setSourceLocation(expr.getSourceLocation()); + return result; + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java new file mode 100644 index 0000000..61880a2 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.rewriter.rules; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismOperatorVisitor; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +/** + * Merges two adjacent window operators into one if their window specifications are compatible. + * <pre> + * window [$x] <- [f()] with nested plan (aggergate [$a] <- [agg_1()] - ... - nts ) + * window [$y] <- [g()] with nesedd plan (aggregate [$b] <- [agg_2()] - ... - nts ) + * --> + * window [$x, $y] <- [f(), g()] with nested plan ( aggregate [$a, $b] <- [agg_1(), agg_2()] - ... - nts ) + * </pre> + */ +public class ConsolidateWindowOperatorsRule implements IAlgebraicRewriteRule { + + @Override + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { + return false; + } + + @Override + public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue(); + if (op1.getOperatorTag() != LogicalOperatorTag.WINDOW) { + return false; + } + WindowOperator winOp1 = (WindowOperator) op1; + + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); + if (op2.getOperatorTag() != LogicalOperatorTag.WINDOW) { + return false; + } + + WindowOperator winOp2 = (WindowOperator) op2; + + if (!IsomorphismOperatorVisitor.compareWindowPartitionSpec(winOp1, winOp2)) { + return false; + } + if (winOp1.hasNestedPlans() && winOp2.hasNestedPlans() + && !IsomorphismOperatorVisitor.compareWindowFrameSpec(winOp1, winOp2)) { + return false; + } + + Set<LogicalVariable> used1 = new HashSet<>(); + VariableUtilities.getUsedVariables(winOp1, used1); + if (!OperatorPropertiesUtil.disjoint(winOp2.getVariables(), used1)) { + return false; + } + + if (winOp2.hasNestedPlans() && !consolidateNestedPlans(winOp1, winOp2, context)) { + return false; + } + + winOp1.getVariables().addAll(winOp2.getVariables()); + winOp1.getExpressions().addAll(winOp2.getExpressions()); + + winOp1.getInputs().clear(); + winOp1.getInputs().addAll(winOp2.getInputs()); + context.computeAndSetTypeEnvironmentForOperator(winOp1); + + return true; + } + + private boolean consolidateNestedPlans(WindowOperator winOpTo, WindowOperator winOpFrom, + IOptimizationContext context) throws AlgebricksException { + if (winOpTo.hasNestedPlans()) { + AggregateOperator aggTo = getAggregateRoot(winOpTo.getNestedPlans()); + if (aggTo == null) { + return false; + } + AggregateOperator aggFrom = getAggregateRoot(winOpFrom.getNestedPlans()); + if (aggFrom == null) { + return false; + } + if (!IsomorphismUtilities.isOperatorIsomorphicPlanSegment(aggTo.getInputs().get(0).getValue(), + aggFrom.getInputs().get(0).getValue())) { + return false; + } + aggTo.getVariables().addAll(aggFrom.getVariables()); + aggTo.getExpressions().addAll(aggFrom.getExpressions()); + context.computeAndSetTypeEnvironmentForOperator(aggTo); + } else { + setAll(winOpTo.getNestedPlans(), winOpFrom.getNestedPlans()); + setAll(winOpTo.getFrameValueExpressions(), winOpFrom.getFrameValueExpressions()); + setAll(winOpTo.getFrameStartExpressions(), winOpFrom.getFrameStartExpressions()); + setAll(winOpTo.getFrameEndExpressions(), winOpFrom.getFrameEndExpressions()); + setAll(winOpTo.getFrameExcludeExpressions(), winOpFrom.getFrameExcludeExpressions()); + winOpTo.setFrameExcludeNegationStartIdx(winOpFrom.getFrameExcludeNegationStartIdx()); + winOpTo.getFrameOffset().setValue(winOpFrom.getFrameOffset().getValue()); + winOpTo.setFrameMaxObjects(winOpFrom.getFrameMaxObjects()); + } + return true; + } + + private AggregateOperator getAggregateRoot(List<ILogicalPlan> nestedPlans) { + if (nestedPlans.size() != 1) { + return null; + } + List<Mutable<ILogicalOperator>> roots = nestedPlans.get(0).getRoots(); + if (roots.size() != 1) { + return null; + } + ILogicalOperator rootOp = roots.get(0).getValue(); + if (rootOp.getOperatorTag() != LogicalOperatorTag.AGGREGATE) { + return null; + } + return (AggregateOperator) rootOp; + } + + private <T> void setAll(Collection<? super T> to, Collection<? extends T> from) { + if (!to.isEmpty()) { + throw new IllegalStateException(String.valueOf(to.size())); + } + to.addAll(from); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java index d0dff03..a97c38b 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java @@ -34,9 +34,9 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.visitors.AbstractConstVarFunVisitor; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; @@ -51,46 +51,69 @@ public class InlineAssignIntoAggregateRule implements IAlgebraicRewriteRule { public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); - if (op.getOperatorTag() != LogicalOperatorTag.GROUP) { + if (op.getOperatorTag() != LogicalOperatorTag.GROUP && op.getOperatorTag() != LogicalOperatorTag.WINDOW) { return false; } boolean changed = false; - GroupByOperator gbyOp = (GroupByOperator) op; - for (ILogicalPlan p : gbyOp.getNestedPlans()) { + AbstractOperatorWithNestedPlans opWithNestedPlan = (AbstractOperatorWithNestedPlans) op; + for (ILogicalPlan p : opWithNestedPlan.getNestedPlans()) { for (Mutable<ILogicalOperator> r : p.getRoots()) { - if (inlined(r)) { - changed = true; - } + changed |= inlined(r.getValue(), opWithNestedPlan); } } return changed; } - private boolean inlined(Mutable<ILogicalOperator> r) throws AlgebricksException { - AbstractLogicalOperator op1 = (AbstractLogicalOperator) r.getValue(); + private boolean inlined(ILogicalOperator planRootOp, AbstractOperatorWithNestedPlans opWithNestedPlan) + throws AlgebricksException { + AbstractLogicalOperator op1 = (AbstractLogicalOperator) planRootOp; if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) { return false; } - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); + AggregateOperator aggOp = (AggregateOperator) op1; + boolean inlined = inlineInputAssignIntoAgg(aggOp); + if (opWithNestedPlan.getOperatorTag() == LogicalOperatorTag.WINDOW) { + inlined |= inlineOuterInputAssignIntoAgg(aggOp, opWithNestedPlan); + } + return inlined; + } + + private boolean inlineInputAssignIntoAgg(AggregateOperator aggOp) throws AlgebricksException { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) aggOp.getInputs().get(0).getValue(); if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) { return false; } - AggregateOperator agg = (AggregateOperator) op1; - AssignOperator assign = (AssignOperator) op2; - VarExprSubstitution ves = new VarExprSubstitution(assign.getVariables(), assign.getExpressions()); - for (Mutable<ILogicalExpression> exprRef : agg.getExpressions()) { + AssignOperator assignOp = (AssignOperator) op2; + VarExprSubstitution ves = new VarExprSubstitution(assignOp.getVariables(), assignOp.getExpressions()); + inlineVariables(aggOp, ves); + List<Mutable<ILogicalOperator>> op1InpList = aggOp.getInputs(); + op1InpList.clear(); + op1InpList.add(op2.getInputs().get(0)); + return true; + } + + private boolean inlineOuterInputAssignIntoAgg(AggregateOperator aggOp, + AbstractOperatorWithNestedPlans opWithNestedPlans) throws AlgebricksException { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) opWithNestedPlans.getInputs().get(0).getValue(); + if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) { + return false; + } + AssignOperator assignOp = (AssignOperator) op2; + VarExprSubstitution ves = new VarExprSubstitution(assignOp.getVariables(), assignOp.getExpressions()); + return inlineVariables(aggOp, ves); + } + + private boolean inlineVariables(AggregateOperator aggOp, VarExprSubstitution ves) throws AlgebricksException { + boolean inlined = false; + for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) { ILogicalExpression expr = exprRef.getValue(); Pair<Boolean, ILogicalExpression> p = expr.accept(ves, null); - if (p.first == true) { + if (p.first) { exprRef.setValue(p.second); + inlined = true; } - // AbstractLogicalExpression ale = (AbstractLogicalExpression) expr; - // ale.accept(ves, null); } - List<Mutable<ILogicalOperator>> op1InpList = op1.getInputs(); - op1InpList.clear(); - op1InpList.add(op2.getInputs().get(0)); - return true; + return inlined; } private class VarExprSubstitution extends AbstractConstVarFunVisitor<Pair<Boolean, ILogicalExpression>, Void> { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java index 5d6237a..729d6f9 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java @@ -39,8 +39,8 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExp import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; @@ -157,9 +157,9 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { } // Descend into subplan - if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) { - SubplanOperator subplanOp = (SubplanOperator) op; - for (ILogicalPlan nestedPlan : subplanOp.getNestedPlans()) { + if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN || op.getOperatorTag() == LogicalOperatorTag.WINDOW) { + List<ILogicalPlan> nestedPlans = ((AbstractOperatorWithNestedPlans) op).getNestedPlans(); + for (ILogicalPlan nestedPlan : nestedPlans) { for (Mutable<ILogicalOperator> root : nestedPlan.getRoots()) { if (inlineVariables(root, context)) { modified = true; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java new file mode 100644 index 0000000..59272ec --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.rewriter.rules; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismOperatorVisitor; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +/** + * If two adjacent window operators compute the same running aggregate then replace the second computation with + * the assign operator referring to the first operator's output variable: + * + * <pre> + * window [$x] <- [f()] ... + * window [$y] <- [f()] ... + * --> + * assign [$x] <- [$y] + * window [] <- [] ... + * window [$y] <- [f()] ... + * </pre> + * + * Both window operators must have the same partitioning specification. + * + * This rule must be followed by {@link RemoveRedundantVariablesRule} to substitute {@code $x} references with + * {@code $y} and then {@link RemoveUnusedAssignAndAggregateRule} to eliminate the new assign operator. + */ +public class ReuseWindowAggregateRule implements IAlgebraicRewriteRule { + @Override + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { + return false; + } + + @Override + public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue(); + if (op1.getOperatorTag() != LogicalOperatorTag.WINDOW) { + return false; + } + WindowOperator winOp1 = (WindowOperator) op1; + + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); + if (op2.getOperatorTag() != LogicalOperatorTag.WINDOW) { + return false; + } + + WindowOperator winOp2 = (WindowOperator) op2; + + if (!IsomorphismOperatorVisitor.compareWindowPartitionSpec(winOp1, winOp2)) { + return false; + } + + List<LogicalVariable> assignVars = new ArrayList<>(); + List<Mutable<ILogicalExpression>> assignExprs = new ArrayList<>(); + + List<LogicalVariable> varsOp1 = winOp1.getVariables(); + List<LogicalVariable> varsOp2 = winOp2.getVariables(); + List<Mutable<ILogicalExpression>> exprsOp1 = winOp1.getExpressions(); + List<Mutable<ILogicalExpression>> exprsOp2 = winOp2.getExpressions(); + Iterator<LogicalVariable> varsOp1Iter = varsOp1.iterator(); + Iterator<Mutable<ILogicalExpression>> exprsOp1Iter = exprsOp1.iterator(); + while (varsOp1Iter.hasNext()) { + LogicalVariable varOp1 = varsOp1Iter.next(); + Mutable<ILogicalExpression> exprOp1 = exprsOp1Iter.next(); + VariableReferenceExpression varOp2Ref = + OperatorManipulationUtil.findAssignedVariable(varsOp2, exprsOp2, exprOp1.getValue()); + if (varOp2Ref != null) { + varsOp1Iter.remove(); + exprsOp1Iter.remove(); + assignVars.add(varOp1); + assignExprs.add(new MutableObject<>(varOp2Ref)); + } + } + + if (assignVars.isEmpty()) { + return false; + } + + AssignOperator assignOp = new AssignOperator(assignVars, assignExprs); + assignOp.getInputs().add(new MutableObject<>(winOp1)); + assignOp.setSourceLocation(winOp1.getSourceLocation()); + context.computeAndSetTypeEnvironmentForOperator(assignOp); + opRef.setValue(assignOp); + return true; + } +} \ No newline at end of file
