Repository: asterixdb
Updated Branches:
  refs/heads/master 8efd847db -> faf9791d2


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 764fe49..a39af41 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -624,32 +624,10 @@ public class IsomorphismOperatorVisitor implements 
ILogicalOperatorVisitor<Boole
             return false;
         }
         WindowOperator windowOpArg = (WindowOperator) copyAndSubstituteVar(op, 
arg);
-        if 
(!VariableUtilities.varListEqualUnordered(op.getPartitionExpressions(),
-                windowOpArg.getPartitionExpressions())) {
+        if (!compareWindowPartitionSpec(op, windowOpArg)) {
             return false;
         }
-        if (!compareIOrderAndExpressions(op.getOrderExpressions(), 
windowOpArg.getOrderExpressions())) {
-            return false;
-        }
-        if (!compareIOrderAndExpressions(op.getFrameValueExpressions(), 
windowOpArg.getFrameValueExpressions())) {
-            return false;
-        }
-        if (!compareExpressions(op.getFrameStartExpressions(), 
windowOpArg.getFrameStartExpressions())) {
-            return false;
-        }
-        if (!compareExpressions(op.getFrameEndExpressions(), 
windowOpArg.getFrameEndExpressions())) {
-            return false;
-        }
-        if (!compareExpressions(op.getFrameExcludeExpressions(), 
windowOpArg.getFrameExcludeExpressions())) {
-            return false;
-        }
-        if (op.getFrameExcludeNegationStartIdx() != 
windowOpArg.getFrameExcludeNegationStartIdx()) {
-            return false;
-        }
-        if (!Objects.equals(op.getFrameOffset().getValue(), 
windowOpArg.getFrameOffset().getValue())) {
-            return false;
-        }
-        if (op.getFrameMaxObjects() != windowOpArg.getFrameMaxObjects()) {
+        if (!compareWindowFrameSpec(op, windowOpArg)) {
             return false;
         }
         if 
(!VariableUtilities.varListEqualUnordered(getPairList(op.getVariables(), 
op.getExpressions()),
@@ -662,36 +640,52 @@ public class IsomorphismOperatorVisitor implements 
ILogicalOperatorVisitor<Boole
         return isomorphic;
     }
 
-    private Boolean compareExpressions(List<Mutable<ILogicalExpression>> 
opExprs,
+    public static boolean compareWindowPartitionSpec(WindowOperator winOp1, 
WindowOperator winOp2) {
+        return 
VariableUtilities.varListEqualUnordered(winOp1.getPartitionExpressions(),
+                winOp2.getPartitionExpressions())
+                && compareIOrderAndExpressions(winOp1.getOrderExpressions(), 
winOp2.getOrderExpressions());
+    }
+
+    public static boolean compareWindowFrameSpec(WindowOperator winOp1, 
WindowOperator winOp2) {
+        return compareIOrderAndExpressions(winOp1.getFrameValueExpressions(), 
winOp2.getFrameValueExpressions())
+                && compareExpressions(winOp1.getFrameStartExpressions(), 
winOp2.getFrameStartExpressions())
+                && compareExpressions(winOp1.getFrameEndExpressions(), 
winOp2.getFrameEndExpressions())
+                && compareExpressions(winOp1.getFrameExcludeExpressions(), 
winOp2.getFrameExcludeExpressions())
+                && winOp1.getFrameExcludeNegationStartIdx() == 
winOp2.getFrameExcludeNegationStartIdx()
+                && Objects.equals(winOp1.getFrameOffset().getValue(), 
winOp2.getFrameOffset().getValue())
+                && winOp1.getFrameMaxObjects() == winOp2.getFrameMaxObjects();
+    }
+
+    private static boolean 
compareExpressions(List<Mutable<ILogicalExpression>> opExprs,
             List<Mutable<ILogicalExpression>> argExprs) {
         if (opExprs.size() != argExprs.size()) {
-            return Boolean.FALSE;
+            return false;
         }
         for (int i = 0; i < opExprs.size(); i++) {
             boolean isomorphic = 
opExprs.get(i).getValue().equals(argExprs.get(i).getValue());
             if (!isomorphic) {
-                return Boolean.FALSE;
+                return false;
             }
         }
-        return Boolean.TRUE;
+        return true;
     }
 
-    private Boolean compareIOrderAndExpressions(List<Pair<IOrder, 
Mutable<ILogicalExpression>>> opOrderExprs,
+    private static boolean compareIOrderAndExpressions(List<Pair<IOrder, 
Mutable<ILogicalExpression>>> opOrderExprs,
             List<Pair<IOrder, Mutable<ILogicalExpression>>> argOrderExprs) {
         if (opOrderExprs.size() != argOrderExprs.size()) {
-            return Boolean.FALSE;
+            return false;
         }
         for (int i = 0; i < opOrderExprs.size(); i++) {
             boolean isomorphic = 
opOrderExprs.get(i).first.equals(argOrderExprs.get(i).first);
             if (!isomorphic) {
-                return Boolean.FALSE;
+                return false;
             }
             isomorphic = 
opOrderExprs.get(i).second.getValue().equals(argOrderExprs.get(i).second.getValue());
             if (!isomorphic) {
-                return Boolean.FALSE;
+                return false;
             }
         }
-        return Boolean.TRUE;
+        return true;
     }
 
     private boolean compareSubplans(List<ILogicalPlan> plans, 
List<ILogicalPlan> plansArg) throws AlgebricksException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 5591053..eca2508 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -99,7 +99,7 @@ public class IsomorphismVariableMappingVisitor implements 
ILogicalOperatorVisito
     @Override
     public Void visitWindowOperator(WindowOperator op, ILogicalOperator arg) 
throws AlgebricksException {
         mapChildren(op, arg);
-        mapVariablesForAbstractAssign(op, arg);
+        mapVariablesForWindow(op, arg);
         mapVariablesInNestedPlans(op, arg);
         return null;
     }
@@ -359,7 +359,7 @@ public class IsomorphismVariableMappingVisitor implements 
ILogicalOperatorVisito
                 rightOp.getExpressions());
     }
 
-    private void mapVariablesForGroupBy(ILogicalOperator left, 
ILogicalOperator right) throws AlgebricksException {
+    private void mapVariablesForGroupBy(ILogicalOperator left, 
ILogicalOperator right) {
         if (left.getOperatorTag() != right.getOperatorTag()) {
             return;
         }
@@ -373,6 +373,16 @@ public class IsomorphismVariableMappingVisitor implements 
ILogicalOperatorVisito
         mapVarExprPairList(leftPairs, rightPairs);
     }
 
+    private void mapVariablesForWindow(ILogicalOperator left, ILogicalOperator 
right) {
+        if (left.getOperatorTag() != right.getOperatorTag()) {
+            return;
+        }
+        WindowOperator leftOp = (WindowOperator) left;
+        WindowOperator rightOp = (WindowOperator) right;
+        mapVariablesForAbstractAssign(leftOp.getVariables(), 
leftOp.getExpressions(), rightOp.getVariables(),
+                rightOp.getExpressions());
+    }
+
     private void mapVarExprPairList(List<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>> leftPairs,
             List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
rightPairs) {
         if (leftPairs.size() != rightPairs.size()) {
@@ -420,6 +430,9 @@ public class IsomorphismVariableMappingVisitor implements 
ILogicalOperatorVisito
 
     private void mapVariablesInNestedPlans(AbstractOperatorWithNestedPlans op, 
ILogicalOperator arg)
             throws AlgebricksException {
+        if (op.getOperatorTag() != arg.getOperatorTag()) {
+            return;
+        }
         AbstractOperatorWithNestedPlans argOp = 
(AbstractOperatorWithNestedPlans) arg;
         List<ILogicalPlan> plans = op.getNestedPlans();
         List<ILogicalPlan> plansArg = argOp.getNestedPlans();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 4d05a1b..967ad6f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -36,6 +36,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -407,4 +408,25 @@ public class OperatorManipulationUtil {
         }
         return clonedExprList;
     }
+
+    /**
+     * Finds a variable assigned to a given expression and returns a new 
{@link VariableReferenceExpression}
+     * referring to this variable.
+     * @param assignVarList list of variables
+     * @param assignExprList list of expressions assigned to those variables
+     * @param searchExpr expression to search for
+     * @return said value, {@code null} if a variable is not found
+     */
+    public static VariableReferenceExpression 
findAssignedVariable(List<LogicalVariable> assignVarList,
+            List<Mutable<ILogicalExpression>> assignExprList, 
ILogicalExpression searchExpr) {
+        for (int i = 0, n = assignExprList.size(); i < n; i++) {
+            ILogicalExpression expr = assignExprList.get(i).getValue();
+            if (expr.equals(searchExpr)) {
+                VariableReferenceExpression result = new 
VariableReferenceExpression(assignVarList.get(i));
+                result.setSourceLocation(expr.getSourceLocation());
+                return result;
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
new file mode 100644
index 0000000..61880a2
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismOperatorVisitor;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Merges two adjacent window operators into one if their window 
specifications are compatible.
+ * <pre>
+ * window [$x] <- [f()] with nested plan (aggergate [$a] <- [agg_1()] - ... - 
nts )
+ * window [$y] <- [g()] with nesedd plan (aggregate [$b] <- [agg_2()] - ... - 
nts )
+ * -->
+ * window [$x, $y] <- [f(), g()] with nested plan ( aggregate [$a, $b] <- 
[agg_1(), agg_2()] - ... - nts )
+ * </pre>
+ */
+public class ConsolidateWindowOperatorsRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) 
opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.WINDOW) {
+            return false;
+        }
+        WindowOperator winOp1 = (WindowOperator) op1;
+
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.WINDOW) {
+            return false;
+        }
+
+        WindowOperator winOp2 = (WindowOperator) op2;
+
+        if (!IsomorphismOperatorVisitor.compareWindowPartitionSpec(winOp1, 
winOp2)) {
+            return false;
+        }
+        if (winOp1.hasNestedPlans() && winOp2.hasNestedPlans()
+                && !IsomorphismOperatorVisitor.compareWindowFrameSpec(winOp1, 
winOp2)) {
+            return false;
+        }
+
+        Set<LogicalVariable> used1 = new HashSet<>();
+        VariableUtilities.getUsedVariables(winOp1, used1);
+        if (!OperatorPropertiesUtil.disjoint(winOp2.getVariables(), used1)) {
+            return false;
+        }
+
+        if (winOp2.hasNestedPlans() && !consolidateNestedPlans(winOp1, winOp2, 
context)) {
+            return false;
+        }
+
+        winOp1.getVariables().addAll(winOp2.getVariables());
+        winOp1.getExpressions().addAll(winOp2.getExpressions());
+
+        winOp1.getInputs().clear();
+        winOp1.getInputs().addAll(winOp2.getInputs());
+        context.computeAndSetTypeEnvironmentForOperator(winOp1);
+
+        return true;
+    }
+
+    private boolean consolidateNestedPlans(WindowOperator winOpTo, 
WindowOperator winOpFrom,
+            IOptimizationContext context) throws AlgebricksException {
+        if (winOpTo.hasNestedPlans()) {
+            AggregateOperator aggTo = 
getAggregateRoot(winOpTo.getNestedPlans());
+            if (aggTo == null) {
+                return false;
+            }
+            AggregateOperator aggFrom = 
getAggregateRoot(winOpFrom.getNestedPlans());
+            if (aggFrom == null) {
+                return false;
+            }
+            if 
(!IsomorphismUtilities.isOperatorIsomorphicPlanSegment(aggTo.getInputs().get(0).getValue(),
+                    aggFrom.getInputs().get(0).getValue())) {
+                return false;
+            }
+            aggTo.getVariables().addAll(aggFrom.getVariables());
+            aggTo.getExpressions().addAll(aggFrom.getExpressions());
+            context.computeAndSetTypeEnvironmentForOperator(aggTo);
+        } else {
+            setAll(winOpTo.getNestedPlans(), winOpFrom.getNestedPlans());
+            setAll(winOpTo.getFrameValueExpressions(), 
winOpFrom.getFrameValueExpressions());
+            setAll(winOpTo.getFrameStartExpressions(), 
winOpFrom.getFrameStartExpressions());
+            setAll(winOpTo.getFrameEndExpressions(), 
winOpFrom.getFrameEndExpressions());
+            setAll(winOpTo.getFrameExcludeExpressions(), 
winOpFrom.getFrameExcludeExpressions());
+            
winOpTo.setFrameExcludeNegationStartIdx(winOpFrom.getFrameExcludeNegationStartIdx());
+            
winOpTo.getFrameOffset().setValue(winOpFrom.getFrameOffset().getValue());
+            winOpTo.setFrameMaxObjects(winOpFrom.getFrameMaxObjects());
+        }
+        return true;
+    }
+
+    private AggregateOperator getAggregateRoot(List<ILogicalPlan> nestedPlans) 
{
+        if (nestedPlans.size() != 1) {
+            return null;
+        }
+        List<Mutable<ILogicalOperator>> roots = nestedPlans.get(0).getRoots();
+        if (roots.size() != 1) {
+            return null;
+        }
+        ILogicalOperator rootOp = roots.get(0).getValue();
+        if (rootOp.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return null;
+        }
+        return (AggregateOperator) rootOp;
+    }
+
+    private <T> void setAll(Collection<? super T> to, Collection<? extends T> 
from) {
+        if (!to.isEmpty()) {
+            throw new IllegalStateException(String.valueOf(to.size()));
+        }
+        to.addAll(from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java
index d0dff03..a97c38b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineAssignIntoAggregateRule.java
@@ -34,9 +34,9 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.AbstractConstVarFunVisitor;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -51,46 +51,69 @@ public class InlineAssignIntoAggregateRule implements 
IAlgebraicRewriteRule {
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+        if (op.getOperatorTag() != LogicalOperatorTag.GROUP && 
op.getOperatorTag() != LogicalOperatorTag.WINDOW) {
             return false;
         }
         boolean changed = false;
-        GroupByOperator gbyOp = (GroupByOperator) op;
-        for (ILogicalPlan p : gbyOp.getNestedPlans()) {
+        AbstractOperatorWithNestedPlans opWithNestedPlan = 
(AbstractOperatorWithNestedPlans) op;
+        for (ILogicalPlan p : opWithNestedPlan.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                if (inlined(r)) {
-                    changed = true;
-                }
+                changed |= inlined(r.getValue(), opWithNestedPlan);
             }
         }
         return changed;
     }
 
-    private boolean inlined(Mutable<ILogicalOperator> r) throws 
AlgebricksException {
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) r.getValue();
+    private boolean inlined(ILogicalOperator planRootOp, 
AbstractOperatorWithNestedPlans opWithNestedPlan)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) planRootOp;
         if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
             return false;
         }
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
+        AggregateOperator aggOp = (AggregateOperator) op1;
+        boolean inlined = inlineInputAssignIntoAgg(aggOp);
+        if (opWithNestedPlan.getOperatorTag() == LogicalOperatorTag.WINDOW) {
+            inlined |= inlineOuterInputAssignIntoAgg(aggOp, opWithNestedPlan);
+        }
+        return inlined;
+    }
+
+    private boolean inlineInputAssignIntoAgg(AggregateOperator aggOp) throws 
AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
aggOp.getInputs().get(0).getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
             return false;
         }
-        AggregateOperator agg = (AggregateOperator) op1;
-        AssignOperator assign = (AssignOperator) op2;
-        VarExprSubstitution ves = new 
VarExprSubstitution(assign.getVariables(), assign.getExpressions());
-        for (Mutable<ILogicalExpression> exprRef : agg.getExpressions()) {
+        AssignOperator assignOp = (AssignOperator) op2;
+        VarExprSubstitution ves = new 
VarExprSubstitution(assignOp.getVariables(), assignOp.getExpressions());
+        inlineVariables(aggOp, ves);
+        List<Mutable<ILogicalOperator>> op1InpList = aggOp.getInputs();
+        op1InpList.clear();
+        op1InpList.add(op2.getInputs().get(0));
+        return true;
+    }
+
+    private boolean inlineOuterInputAssignIntoAgg(AggregateOperator aggOp,
+            AbstractOperatorWithNestedPlans opWithNestedPlans) throws 
AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
opWithNestedPlans.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        AssignOperator assignOp = (AssignOperator) op2;
+        VarExprSubstitution ves = new 
VarExprSubstitution(assignOp.getVariables(), assignOp.getExpressions());
+        return inlineVariables(aggOp, ves);
+    }
+
+    private boolean inlineVariables(AggregateOperator aggOp, 
VarExprSubstitution ves) throws AlgebricksException {
+        boolean inlined = false;
+        for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
             ILogicalExpression expr = exprRef.getValue();
             Pair<Boolean, ILogicalExpression> p = expr.accept(ves, null);
-            if (p.first == true) {
+            if (p.first) {
                 exprRef.setValue(p.second);
+                inlined = true;
             }
-            // AbstractLogicalExpression ale = (AbstractLogicalExpression) 
expr;
-            // ale.accept(ves, null);
         }
-        List<Mutable<ILogicalOperator>> op1InpList = op1.getInputs();
-        op1InpList.clear();
-        op1InpList.add(op2.getInputs().get(0));
-        return true;
+        return inlined;
     }
 
     private class VarExprSubstitution extends 
AbstractConstVarFunVisitor<Pair<Boolean, ILogicalExpression>, Void> {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 5d6237a..729d6f9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -39,8 +39,8 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExp
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -157,9 +157,9 @@ public class InlineVariablesRule implements 
IAlgebraicRewriteRule {
         }
 
         // Descend into subplan
-        if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-            SubplanOperator subplanOp = (SubplanOperator) op;
-            for (ILogicalPlan nestedPlan : subplanOp.getNestedPlans()) {
+        if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN || 
op.getOperatorTag() == LogicalOperatorTag.WINDOW) {
+            List<ILogicalPlan> nestedPlans = 
((AbstractOperatorWithNestedPlans) op).getNestedPlans();
+            for (ILogicalPlan nestedPlan : nestedPlans) {
                 for (Mutable<ILogicalOperator> root : nestedPlan.getRoots()) {
                     if (inlineVariables(root, context)) {
                         modified = true;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/faf9791d/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java
new file mode 100644
index 0000000..59272ec
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ReuseWindowAggregateRule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismOperatorVisitor;
+import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * If two adjacent window operators compute the same running aggregate then 
replace the second computation with
+ * the assign operator referring to the first operator's output variable:
+ *
+ * <pre>
+ * window [$x] <- [f()] ...
+ * window [$y] <- [f()] ...
+ * -->
+ * assign [$x] <- [$y]
+ * window [] <- [] ...
+ * window [$y] <- [f()] ...
+ * </pre>
+ *
+ * Both window operators must have the same partitioning specification.
+ *
+ * This rule must be followed by {@link RemoveRedundantVariablesRule} to 
substitute {@code $x} references with
+ * {@code $y} and then {@link RemoveUnusedAssignAndAggregateRule} to eliminate 
the new assign operator.
+ */
+public class ReuseWindowAggregateRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) 
opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.WINDOW) {
+            return false;
+        }
+        WindowOperator winOp1 = (WindowOperator) op1;
+
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.WINDOW) {
+            return false;
+        }
+
+        WindowOperator winOp2 = (WindowOperator) op2;
+
+        if (!IsomorphismOperatorVisitor.compareWindowPartitionSpec(winOp1, 
winOp2)) {
+            return false;
+        }
+
+        List<LogicalVariable> assignVars = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> assignExprs = new ArrayList<>();
+
+        List<LogicalVariable> varsOp1 = winOp1.getVariables();
+        List<LogicalVariable> varsOp2 = winOp2.getVariables();
+        List<Mutable<ILogicalExpression>> exprsOp1 = winOp1.getExpressions();
+        List<Mutable<ILogicalExpression>> exprsOp2 = winOp2.getExpressions();
+        Iterator<LogicalVariable> varsOp1Iter = varsOp1.iterator();
+        Iterator<Mutable<ILogicalExpression>> exprsOp1Iter = 
exprsOp1.iterator();
+        while (varsOp1Iter.hasNext()) {
+            LogicalVariable varOp1 = varsOp1Iter.next();
+            Mutable<ILogicalExpression> exprOp1 = exprsOp1Iter.next();
+            VariableReferenceExpression varOp2Ref =
+                    OperatorManipulationUtil.findAssignedVariable(varsOp2, 
exprsOp2, exprOp1.getValue());
+            if (varOp2Ref != null) {
+                varsOp1Iter.remove();
+                exprsOp1Iter.remove();
+                assignVars.add(varOp1);
+                assignExprs.add(new MutableObject<>(varOp2Ref));
+            }
+        }
+
+        if (assignVars.isEmpty()) {
+            return false;
+        }
+
+        AssignOperator assignOp = new AssignOperator(assignVars, assignExprs);
+        assignOp.getInputs().add(new MutableObject<>(winOp1));
+        assignOp.setSourceLocation(winOp1.getSourceLocation());
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        opRef.setValue(assignOp);
+        return true;
+    }
+}
\ No newline at end of file

Reply via email to