http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index 2b5e569..d91a255 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -69,6 +69,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; @@ -609,7 +610,32 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole @Override public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException { - return true; + AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; + if (aop.getOperatorTag() != LogicalOperatorTag.SINK) { + return Boolean.FALSE; + } + return Boolean.TRUE; + } + + @Override + public Boolean visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException { + AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; + if (aop.getOperatorTag() != LogicalOperatorTag.WINDOW) { + return Boolean.FALSE; + } + WindowOperator windowOpArg = (WindowOperator) copyAndSubstituteVar(op, arg); + if (!VariableUtilities.varListEqualUnordered(op.getPartitionExpressions(), + windowOpArg.getPartitionExpressions())) { + return Boolean.FALSE; + } + if (!compareIOrderAndExpressions(op.getOrderExpressions(), windowOpArg.getOrderExpressions())) { + return Boolean.FALSE; + } + if (!VariableUtilities.varListEqualUnordered(getPairList(op.getVariables(), op.getExpressions()), + getPairList(windowOpArg.getVariables(), windowOpArg.getExpressions()))) { + return Boolean.FALSE; + } + return Boolean.TRUE; } private Boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java index 742d485..d0aec16 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java @@ -68,6 +68,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -96,6 +97,13 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito } @Override + public Void visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException { + mapChildren(op, arg); + mapVariablesForAbstractAssign(op, arg); + return null; + } + + @Override public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalOperator arg) throws AlgebricksException { mapVariablesStandard(op, arg); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index 0196db6..198ffdc 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; @@ -611,6 +612,20 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor return opCopy; } + @Override + public ILogicalOperator visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException { + List<Mutable<ILogicalExpression>> partitionExprCopy = + exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getPartitionExpressions()); + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprCopy = + deepCopyOrderExpressionReferencePairList(op.getOrderExpressions()); + List<LogicalVariable> varCopy = deepCopyVariableList(op.getVariables()); + List<Mutable<ILogicalExpression>> exprCopy = + exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()); + WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, varCopy, exprCopy); + deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); + return opCopy; + } + public LinkedHashMap<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() { return inputVarToOutputVarMapping; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java index 7d3d676..0aaa529 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java @@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; @@ -205,6 +206,12 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I } @Override + public Void visitWindowOperator(WindowOperator op, IOptimizationContext context) throws AlgebricksException { + visitAssignment(op, context); + return null; + } + + @Override public Void visitScriptOperator(ScriptOperator op, IOptimizationContext arg) throws AlgebricksException { // TODO Auto-generated method stub return null; @@ -367,5 +374,4 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I throws AlgebricksException { return null; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index c6f0c14..e5ca646 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; @@ -406,4 +407,19 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical return new LeftOuterUnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()), op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter()); } + + @Override + public ILogicalOperator visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + List<Mutable<ILogicalExpression>> newPartitionExprs = new ArrayList<>(); + deepCopyExpressionRefs(op.getPartitionExpressions(), newPartitionExprs); + List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderExprs = + deepCopyOrderAndExpression(op.getOrderExpressions()); + + ArrayList<LogicalVariable> newList = new ArrayList<>(); + ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>(); + newList.addAll(op.getVariables()); + deepCopyExpressionRefs(newExpressions, op.getExpressions()); + + return new WindowOperator(newPartitionExprs, newOrderExprs, newList, newExpressions); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java index f36f604..eb90288 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java @@ -62,6 +62,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; @@ -289,4 +290,8 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void, return null; } + @Override + public Void visitWindowOperator(WindowOperator op, IOptimizationContext arg) throws AlgebricksException { + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java index 5d0ef6a..43b7c80 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java @@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -91,6 +92,12 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo } @Override + public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + producedVariables.addAll(op.getVariables()); + return null; + } + + @Override public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException { producedVariables.addAll(op.getVariables()); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java index 70ccf6d..69b17ed 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java @@ -64,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -343,4 +344,9 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void return null; } + @Override + public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + standardLayout(op); + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java index c62f555..99d3488 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java @@ -65,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; @@ -85,15 +86,7 @@ public class SubstituteVariableVisitor @Override public Void visitAggregateOperator(AggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { - List<LogicalVariable> variables = op.getVariables(); - int n = variables.size(); - for (int i = 0; i < n; i++) { - if (variables.get(i).equals(pair.first)) { - variables.set(i, pair.second); - } else { - op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second); - } - } + substAssignVariables(op.getVariables(), op.getExpressions(), pair); substVarTypes(op, pair); return null; } @@ -101,15 +94,7 @@ public class SubstituteVariableVisitor @Override public Void visitAssignOperator(AssignOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { - List<LogicalVariable> variables = op.getVariables(); - int n = variables.size(); - for (int i = 0; i < n; i++) { - if (variables.get(i).equals(pair.first)) { - variables.set(i, pair.second); - } else { - op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second); - } - } + substAssignVariables(op.getVariables(), op.getExpressions(), pair); // Substitute variables stored in ordering property if (op.getExplicitOrderingProperty() != null) { List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns(); @@ -134,10 +119,10 @@ public class SubstituteVariableVisitor return null; } } - substVarTypes(op, pair); if (op.getSelectCondition() != null) { op.getSelectCondition().getValue().substituteVar(pair.first, pair.second); } + substVarTypes(op, pair); return null; } @@ -240,15 +225,7 @@ public class SubstituteVariableVisitor @Override public Void visitRunningAggregateOperator(RunningAggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { - List<LogicalVariable> variables = op.getVariables(); - int n = variables.size(); - for (int i = 0; i < n; i++) { - if (variables.get(i).equals(pair.first)) { - variables.set(i, pair.second); - } else { - op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second); - } - } + substAssignVariables(op.getVariables(), op.getExpressions(), pair); substVarTypes(op, pair); return null; } @@ -403,6 +380,18 @@ public class SubstituteVariableVisitor } } + private void substAssignVariables(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions, + Pair<LogicalVariable, LogicalVariable> pair) { + int n = variables.size(); + for (int i = 0; i < n; i++) { + if (variables.get(i).equals(pair.first)) { + variables.set(i, pair.second); + } else { + expressions.get(i).getValue().substituteVar(pair.first, pair.second); + } + } + } + @Override public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> arg) throws AlgebricksException { @@ -510,4 +499,18 @@ public class SubstituteVariableVisitor substVarTypes(op, pair); return null; } + + @Override + public Void visitWindowOperator(WindowOperator op, Pair<LogicalVariable, LogicalVariable> pair) + throws AlgebricksException { + for (Mutable<ILogicalExpression> pe : op.getPartitionExpressions()) { + pe.getValue().substituteVar(pair.first, pair.second); + } + for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) { + oe.second.getValue().substituteVar(pair.first, pair.second); + } + substAssignVariables(op.getVariables(), op.getExpressions(), pair); + substVarTypes(op, pair); + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index 2c68697..b4bea84 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; @@ -471,4 +472,17 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> return null; } + @Override + public Void visitWindowOperator(WindowOperator op, Void arg) { + for (Mutable<ILogicalExpression> exprRef : op.getPartitionExpressions()) { + exprRef.getValue().getUsedVariables(usedVariables); + } + for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) { + oe.second.getValue().getUsedVariables(usedVariables); + } + for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) { + exprRef.getValue().getUsedVariables(usedVariables); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java index d68be20..09ee358 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java @@ -40,7 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.aggrun.RunningAggregateRuntimeFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; public class RunningAggregatePOperator extends AbstractPhysicalOperator { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java new file mode 100644 index 0000000..7853524 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.ListSet; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.aggrun.WindowRuntimeFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; + +public class WindowPOperator extends AbstractPhysicalOperator { + + private final List<LogicalVariable> partitionColumns; + + private final boolean partitionMaterialization; + + private final List<OrderColumn> orderColumns; + + public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization, + List<OrderColumn> orderColumns) { + this.partitionColumns = partitionColumns; + this.partitionMaterialization = partitionMaterialization; + this.orderColumns = orderColumns; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.WINDOW; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException { + IPartitioningProperty pp; + switch (op.getExecutionMode()) { + case PARTITIONED: + pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns), + context.getComputationNodeDomain()); + break; + case UNPARTITIONED: + pp = IPartitioningProperty.UNPARTITIONED; + break; + case LOCAL: + pp = null; + break; + default: + throw new IllegalStateException(op.getExecutionMode().name()); + } + + // require local order property [pc1, ... pcN, oc1, ... ocN] + // accounting for cases where there's an overlap between order and partition columns + // TODO replace with required local grouping on partition columns + local order on order columns + List<OrderColumn> lopColumns = new ArrayList<>(); + ListSet<LogicalVariable> pcVars = new ListSet<>(); + pcVars.addAll(partitionColumns); + for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) { + OrderColumn oc = orderColumns.get(oIdx); + LogicalVariable ocVar = oc.getColumn(); + if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) { + throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC, + op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns)); + } + lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder())); + } + int pIdx = 0; + for (LogicalVariable pColumn : pcVars) { + lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC)); + } + List<ILocalStructuralProperty> localProps = Collections.singletonList(new LocalOrderProperty(lopColumns)); + + return new PhysicalRequirements( + new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) }, + IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + WindowOperator winOp = (WindowOperator) op; + int[] outColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables()); + List<Mutable<ILogicalExpression>> expressions = winOp.getExpressions(); + IRunningAggregateEvaluatorFactory[] winFuncs = new IRunningAggregateEvaluatorFactory[expressions.size()]; + IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); + for (int i = 0; i < winFuncs.length; i++) { + StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) expressions.get(i).getValue(); + winFuncs[i] = expressionRuntimeProvider.createRunningAggregateFunctionFactory(expr, + context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context); + } + + // TODO push projections into the operator + int[] projectionList = JobGenHelper.projectAllVariables(opSchema); + + int[] partitionColumnList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns); + + IBinaryComparatorFactory[] partitionComparatorFactories = JobGenHelper + .variablesToAscBinaryComparatorFactories(partitionColumns, context.getTypeEnvironment(op), context); + + //TODO not all functions need order comparators + IBinaryComparatorFactory[] orderComparatorFactories = JobGenHelper + .variablesToBinaryComparatorFactories(orderColumns, context.getTypeEnvironment(op), context); + + WindowRuntimeFactory runtime = new WindowRuntimeFactory(outColumns, winFuncs, projectionList, + partitionColumnList, partitionComparatorFactories, partitionMaterialization, orderComparatorFactories); + runtime.setSourceLocation(winOp.getSourceLocation()); + + // contribute one Asterix framewriter + RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + builder.contributeMicroOperator(winOp, runtime, recDesc); + // and contribute one edge from its child + ILogicalOperator src = winOp.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, winOp, 0); + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public boolean expensiveThanMaterialization() { + return true; + } + + public boolean isPartitionMaterialization() { + return partitionMaterialization; + } + + private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) { + for (int i = startIdx, ln = ocList.size(); i < ln; i++) { + if (varSet.contains(ocList.get(i).getColumn())) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index 77f052e..ad45614 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor; @@ -170,13 +171,10 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr @Override public Void visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("order "); - for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { - if (op.getTopK() != -1) { - buffer.append("(topK: " + op.getTopK() + ") "); - } - String fst = getOrderString(p.first); - buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") "); + if (op.getTopK() != -1) { + buffer.append("(topK: " + op.getTopK() + ") "); } + pprintOrderList(op.getOrderExpressions(), indent); return null; } @@ -484,6 +482,17 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr return null; } + @Override + public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException { + addIndent(indent).append("window ").append(str(op.getVariables())).append(" <- "); + pprintExprList(op.getExpressions(), indent); + buffer.append(" partition "); + pprintExprList(op.getPartitionExpressions(), indent); + buffer.append(" order "); + pprintOrderList(op.getOrderExpressions(), indent); + return null; + } + protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent) throws AlgebricksException { boolean first = true; if (op.getNestedPlans().isEmpty()) { @@ -537,4 +546,12 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr } buffer.append("]"); } + + protected void pprintOrderList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderList, + Integer indent) throws AlgebricksException { + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderList) { + String fst = getOrderString(p.first); + buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") "); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index 4a17cc6..4c810ab 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@ -70,6 +70,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; @@ -251,30 +252,12 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat @Override public Void visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("\"operator\": \"order\""); - for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { - buffer.append(",\n"); - if (op.getTopK() != -1) { - addIndent(indent).append("\"topK\": \"" + op.getTopK() + "\",\n"); - } - String fst = getOrderString(p.first); - addIndent(indent).append("\"first\": " + fst + ",\n"); - addIndent(indent).append( - "\"second\": \"" + p.second.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\""); - } + int topK = op.getTopK(); + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions(); + pprintOrderExprList(orderExpressions, topK, indent); return null; } - private String getOrderString(OrderOperator.IOrder first) { - switch (first.getKind()) { - case ASC: - return "\"ASC\""; - case DESC: - return "\"DESC\""; - default: - return first.getExpressionRef().toString(); - } - } - @Override public Void visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("\"operator\": \"assign\""); @@ -667,6 +650,23 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat return null; } + @Override + public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException { + addIndent(indent).append("\"operator\": \"window\""); + variablePrintHelper(op.getVariables(), indent); + addIndent(0).append(",\n"); + pprintExprList(op.getExpressions(), indent); + if (!op.getPartitionExpressions().isEmpty()) { + buffer.append(",\n"); + addIndent(indent).append("\"partition by\": "); + pprintExprList(op.getPartitionExpressions(), indent); + } + buffer.append(",\n"); + addIndent(indent).append("\"order by\": "); + pprintOrderExprList(op.getOrderExpressions(), -1, indent); + return null; + } + protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent) throws AlgebricksException { idCounter.nextPrefix(); buffer.append("[\n"); @@ -718,4 +718,29 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat } buffer.append("]"); } + + private void pprintOrderExprList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions, + int topK, Integer indent) throws AlgebricksException { + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) { + buffer.append(",\n"); + if (topK != -1) { + addIndent(indent).append("\"topK\": \"" + topK + "\",\n"); + } + String fst = getOrderString(p.first); + addIndent(indent).append("\"first\": " + fst + ",\n"); + addIndent(indent).append( + "\"second\": \"" + p.second.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\""); + } + } + + private String getOrderString(OrderOperator.IOrder first) { + switch (first.getKind()) { + case ASC: + return "\"ASC\""; + case DESC: + return "\"DESC\""; + default: + return first.getExpressionRef().toString(); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index 67199b9..ea914fa 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; @@ -134,6 +135,13 @@ public class OperatorManipulationUtil { forceUnpartitioned = true; } } + if (op.getOperatorTag() == LogicalOperatorTag.WINDOW) { + WindowOperator winOp = (WindowOperator) op; + if (winOp.getPartitionExpressions().isEmpty()) { + op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + forceUnpartitioned = true; + } + } for (Mutable<ILogicalOperator> i : op.getInputs()) { boolean exit = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java index 548a29f..9d5cdeb 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java @@ -52,6 +52,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; @@ -126,4 +127,6 @@ public interface ILogicalOperatorVisitor<R, T> { public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException; public R visitForwardOperator(ForwardOperator op, T arg) throws AlgebricksException; + + public R visitWindowOperator(WindowOperator op, T arg) throws AlgebricksException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java index b204bcb..5142ce7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java @@ -25,6 +25,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider; import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider; @@ -144,6 +146,20 @@ public final class JobGenHelper { return compFactories; } + public static IBinaryComparatorFactory[] variablesToBinaryComparatorFactories(Collection<OrderColumn> orderColumns, + IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException { + IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[orderColumns.size()]; + IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider(); + int i = 0; + for (OrderColumn oc : orderColumns) { + LogicalVariable v = oc.getColumn(); + boolean ascending = oc.getOrder() == OrderOperator.IOrder.OrderKind.ASC; + Object type = env.getVarType(v); + compFactories[i++] = bcfProvider.getBinaryComparatorFactory(type, ascending); + } + return compFactories; + } + public static INormalizedKeyComputerFactory variablesToAscNormalizedKeyComputerFactory( Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException { @@ -181,12 +197,20 @@ public final class JobGenHelper { } public static int[] projectAllVariables(IOperatorSchema opSchema) { - int[] projectionList = new int[opSchema.getSize()]; + return projectVariablesImpl(opSchema, opSchema, opSchema.getSize()); + } + + public static int[] projectVariables(IOperatorSchema opSchema, List<LogicalVariable> variables) { + return projectVariablesImpl(opSchema, variables, variables.size()); + } + + private static int[] projectVariablesImpl(IOperatorSchema opSchema, Iterable<LogicalVariable> variables, + int variableCount) { + int[] projectionList = new int[variableCount]; int k = 0; - for (LogicalVariable v : opSchema) { + for (LogicalVariable v : variables) { projectionList[k++] = opSchema.findVariable(v); } return projectionList; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java index 113d205..f10c3a4 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java @@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; @@ -192,17 +193,7 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String stringBuilder.append("(topK: ").append(op.getTopK()).append(") "); } stringBuilder.append("("); - switch (p.first.getKind()) { - case ASC: - stringBuilder.append("ASC"); - break; - case DESC: - stringBuilder.append("DESC"); - break; - default: - final Mutable<ILogicalExpression> expressionRef = p.first.getExpressionRef(); - stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString()); - } + appendOrder(p.first); stringBuilder.append(", ").append(p.second.getValue().toString()).append(") "); } appendSchema(op, showDetails); @@ -211,6 +202,20 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String return stringBuilder.toString(); } + private void appendOrder(OrderOperator.IOrder order) { + switch (order.getKind()) { + case ASC: + stringBuilder.append("ASC"); + break; + case DESC: + stringBuilder.append("DESC"); + break; + default: + final Mutable<ILogicalExpression> expressionRef = order.getExpressionRef(); + stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString()); + } + } + @Override public String visitAssignOperator(AssignOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); @@ -595,6 +600,26 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String return stringBuilder.toString(); } + @Override + public String visitWindowOperator(WindowOperator op, Boolean showDetails) throws AlgebricksException { + stringBuilder.setLength(0); + stringBuilder.append("window (").append(str(op.getVariables())).append(" <- "); + printExprList(op.getExpressions()); + stringBuilder.append(") partition by ("); + printExprList(op.getPartitionExpressions()); + stringBuilder.append(") order by ("); + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { + stringBuilder.append("("); + appendOrder(p.first); + stringBuilder.append(", ").append(p.second.getValue().toString()).append(") "); + } + stringBuilder.append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); + return stringBuilder.toString(); + } + private void printExprList(List<Mutable<ILogicalExpression>> expressions) { stringBuilder.append("["); expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", ")); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java index 036b3e1..7adc732 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java @@ -18,26 +18,30 @@ */ package org.apache.hyracks.algebricks.rewriter.rules; +import java.util.List; +import java.util.function.Function; +import java.util.function.Predicate; + import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; public abstract class AbstractExtractExprRule implements IAlgebraicRewriteRule { - protected LogicalVariable extractExprIntoAssignOpRef(ILogicalExpression gExpr, Mutable<ILogicalOperator> opRef2, - IOptimizationContext context) throws AlgebricksException { + protected static LogicalVariable extractExprIntoAssignOpRef(ILogicalExpression gExpr, + Mutable<ILogicalOperator> opRef2, IOptimizationContext context) throws AlgebricksException { LogicalVariable v = context.newVar(); - AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(gExpr)); + AssignOperator a = new AssignOperator(v, new MutableObject<>(gExpr)); a.setSourceLocation(gExpr.getSourceLocation()); - a.getInputs().add(new MutableObject<ILogicalOperator>(opRef2.getValue())); + a.getInputs().add(new MutableObject<>(opRef2.getValue())); opRef2.setValue(a); if (gExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT) { context.addNotToBeInlinedVar(v); @@ -46,4 +50,37 @@ public abstract class AbstractExtractExprRule implements IAlgebraicRewriteRule { return v; } + protected static <T> boolean extractComplexExpressions(ILogicalOperator op, List<T> exprList, + Function<T, Mutable<ILogicalExpression>> exprGetter, Predicate<ILogicalExpression> retainPredicate, + IOptimizationContext context) throws AlgebricksException { + if (!hasComplexExpressions(exprList, exprGetter)) { + return false; + } + boolean rewritten = false; + Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(0); + for (T item : exprList) { + Mutable<ILogicalExpression> exprMutable = exprGetter.apply(item); + ILogicalExpression expr = exprMutable.getValue(); + if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE && !retainPredicate.test(expr)) { + LogicalVariable v = extractExprIntoAssignOpRef(expr, inputOpRef, context); + VariableReferenceExpression vRef = new VariableReferenceExpression(v); + vRef.setSourceLocation(expr.getSourceLocation()); + exprMutable.setValue(vRef); + rewritten = true; + } + } + context.computeAndSetTypeEnvironmentForOperator(op); + return rewritten; + } + + private static <T> boolean hasComplexExpressions(List<T> exprList, + Function<T, Mutable<ILogicalExpression>> exprGetter) { + for (T item : exprList) { + Mutable<ILogicalExpression> exprMutable = exprGetter.apply(item); + if (exprMutable.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index cdab2f4..52b3f59 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -515,7 +515,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { case LOCAL_GROUPING_PROPERTY: { LocalGroupingProperty g = (LocalGroupingProperty) prop; Collection<LogicalVariable> vars = - (g.getPreferredOrderEnforcer() != null) ? g.getPreferredOrderEnforcer() : g.getColumnSet(); + !g.getPreferredOrderEnforcer().isEmpty() ? g.getPreferredOrderEnforcer() : g.getColumnSet(); List<OrderColumn> orderColumns = new ArrayList<>(); for (LogicalVariable v : vars) { OrderColumn oc = new OrderColumn(v, OrderKind.ASC); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java index eb2bee6..06b2e16 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java @@ -18,17 +18,17 @@ */ package org.apache.hyracks.algebricks.rewriter.rules; -import org.apache.commons.lang3.mutable.Mutable; +import java.util.List; +import java.util.function.Function; +import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; -import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; @@ -55,8 +55,8 @@ public class ExtractGbyExpressionsRule extends AbstractExtractExprRule { } context.addToDontApplySet(this, op1); GroupByOperator g = (GroupByOperator) op1; - boolean r1 = gbyExprWasRewritten(g, context); - boolean r2 = decorExprWasRewritten(g, context); + boolean r1 = extractComplexExpressions(g, g.getGroupByList(), context); + boolean r2 = extractComplexExpressions(g, g.getDecorList(), context); boolean fired = r1 || r2; if (fired) { context.computeAndSetTypeEnvironmentForOperator(g); @@ -64,56 +64,15 @@ public class ExtractGbyExpressionsRule extends AbstractExtractExprRule { return fired; } - private boolean gbyExprWasRewritten(GroupByOperator g, IOptimizationContext context) throws AlgebricksException { - if (!gbyHasComplexExpr(g)) { - return false; - } - Mutable<ILogicalOperator> opRef2 = g.getInputs().get(0); - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getGroupByList()) { - ILogicalExpression expr = gbyPair.second.getValue(); - if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) { - LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context); - VariableReferenceExpression vRef = new VariableReferenceExpression(v); - vRef.setSourceLocation(expr.getSourceLocation()); - gbyPair.second.setValue(vRef); - } - } - return true; - } - - private boolean decorExprWasRewritten(GroupByOperator g, IOptimizationContext context) throws AlgebricksException { - if (!decorHasComplexExpr(g)) { - return false; - } - Mutable<ILogicalOperator> opRef2 = g.getInputs().get(0); - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : g.getDecorList()) { - ILogicalExpression expr = decorPair.second.getValue(); - if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) { - LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context); - VariableReferenceExpression vRef = new VariableReferenceExpression(v); - vRef.setSourceLocation(expr.getSourceLocation()); - decorPair.second.setValue(vRef); - } - } - return true; - } - - private boolean gbyHasComplexExpr(GroupByOperator g) { - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getGroupByList()) { - if (gbyPair.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) { - return true; - } - } - return false; + private static boolean extractComplexExpressions(ILogicalOperator op, + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> exprList, IOptimizationContext context) + throws AlgebricksException { + return extractComplexExpressions(op, exprList, Pair::getSecond, context); } - private boolean decorHasComplexExpr(GroupByOperator g) { - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getDecorList()) { - if (gbyPair.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) { - return true; - } - } - return false; + public static <T> boolean extractComplexExpressions(ILogicalOperator op, List<T> exprList, + Function<T, Mutable<ILogicalExpression>> exprGetter, IOptimizationContext context) + throws AlgebricksException { + return extractComplexExpressions(op, exprList, exprGetter, t -> false, context); } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java index aa58985..e6f86be 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.algebricks.rewriter.rules; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; @@ -39,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; @@ -82,6 +84,10 @@ public class PushNestedOrderByUnderPreSortedGroupByRule implements IAlgebraicRew if (!isIndependentFromChildren(order1)) { return false; } + if (OperatorManipulationUtil.ancestorOfOperators(order1.getInputs().get(0).getValue(), + EnumSet.of(LogicalOperatorTag.ORDER))) { + return false; + } AbstractPhysicalOperator pOrder1 = (AbstractPhysicalOperator) op2.getPhysicalOperator(); if (pOrder1.getOperatorTag() != PhysicalOperatorTag.STABLE_SORT && pOrder1.getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java index 6b09894..1388ccb 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java @@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor; @@ -256,6 +257,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor return visit(op); } + @Override + public ILogicalOperator visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + return visit(op); + } + private ILogicalOperator visit(ILogicalOperator op) throws AlgebricksException { List<Map<LogicalVariable, LogicalVariable>> varMapSnapshots = new ArrayList<>(); for (Mutable<ILogicalOperator> childRef : op.getInputs()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml index bcc537a..c8f7cbf 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml +++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml @@ -73,6 +73,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-storage-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java new file mode 100644 index 0000000..b4030f7 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.base; + +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IWindowAggregateEvaluator extends IRunningAggregateEvaluator { + default void configure(IBinaryComparator[] orderComparators) { + } + + void initPartition(long partitionLength) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java new file mode 100644 index 0000000..27354cb --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; + +public abstract class AbstractRunningAggregatePushRuntime<T extends IRunningAggregateEvaluator> + extends AbstractOneInputOneOutputOneFramePushRuntime { + protected final IHyracksTaskContext ctx; + private final IRunningAggregateEvaluatorFactory[] aggFactories; + private final Class<T> aggEvalClass; + protected final List<T> aggEvals; + private final int[] projectionList; + private final int[] projectionToOutColumns; + private final IPointable p = VoidPointable.FACTORY.createPointable(); + private final ArrayTupleBuilder tupleBuilder; + private boolean first; + + public AbstractRunningAggregatePushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, + int[] projectionList, IHyracksTaskContext ctx, Class<T> aggEvalClass) { + this.ctx = ctx; + this.projectionList = projectionList; + this.aggFactories = aggFactories; + this.aggEvalClass = aggEvalClass; + aggEvals = new ArrayList<>(aggFactories.length); + tupleBuilder = new ArrayTupleBuilder(projectionList.length); + projectionToOutColumns = new int[projectionList.length]; + + for (int j = 0; j < projectionList.length; j++) { + projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]); + } + first = true; + } + + @Override + public void open() throws HyracksDataException { + super.open(); + if (first) { + first = false; + init(); + } + for (T aggEval : aggEvals) { + aggEval.init(); + } + } + + protected void init() throws HyracksDataException { + initAccessAppendRef(ctx); + for (IRunningAggregateEvaluatorFactory aggFactory : aggFactories) { + IRunningAggregateEvaluator aggEval = aggFactory.createRunningAggregateEvaluator(ctx); + aggEvals.add(aggEvalClass.cast(aggEval)); + } + } + + protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException { + for (int t = beginIdx; t <= endIdx; t++) { + tRef.reset(accessor, t); + produceTuple(tupleBuilder, accessor, t, tRef); + appendToFrameFromTupleBuilder(tupleBuilder); + } + } + + private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, + FrameTupleReference tupleRef) throws HyracksDataException { + tb.reset(); + for (int f = 0; f < projectionList.length; f++) { + int k = projectionToOutColumns[f]; + if (k >= 0) { + aggEvals.get(k).step(tupleRef, p); + tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength()); + } else { + tb.addField(accessor, tIndex, projectionList[f]); + } + } + } + + @Override + public void flush() throws HyracksDataException { + appender.flush(writer); + } +} \ No newline at end of file
