http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java index b754533..3fb3507 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java @@ -62,6 +62,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; import org.apache.asterix.lang.sqlpp.struct.SetOperationRight; import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil; import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppQueryExpressionVisitor; @@ -424,6 +425,16 @@ public class FreeVariableVisitor extends AbstractSqlppQueryExpressionVisitor<Voi return null; } + @Override + public Void visit(WindowExpression winExpr, Collection<VariableExpr> freeVars) throws CompilationException { + winExpr.getExpr().accept(this, freeVars); + if (winExpr.hasPartitionList()) { + visit(winExpr.getPartitionList(), freeVars); + } + visit(winExpr.getOrderbyList(), freeVars); + return null; + } + private void visitLetClauses(List<LetClause> letClauses, Collection<VariableExpr> freeVars) throws CompilationException { if (letClauses == null || letClauses.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java index 0973bec..e4d2a27 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.clause.GroupbyClause; import org.apache.asterix.lang.common.clause.LetClause; +import org.apache.asterix.lang.common.clause.OrderbyClause; import org.apache.asterix.lang.common.expression.CallExpr; import org.apache.asterix.lang.common.expression.GbyVariableExpressionPair; import org.apache.asterix.lang.common.expression.VariableExpr; @@ -47,6 +48,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; import org.apache.asterix.lang.sqlpp.struct.SetOperationRight; import org.apache.asterix.lang.sqlpp.util.FunctionMapUtil; import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor; @@ -331,4 +333,28 @@ public class SqlppAstPrintVisitor extends QueryPrintVisitor implements ISqlppVis return null; } + @Override + public Void visit(WindowExpression winExpr, Integer step) throws CompilationException { + out.print(skip(step) + "WINDOW"); + winExpr.getExpr().accept(this, step + 1); + out.println(); + out.println(skip(step) + "OVER ("); + if (winExpr.hasPartitionList()) { + out.println(skip(step + 1) + "PARTITION BY"); + List<Expression> partitionList = winExpr.getPartitionList(); + for (Expression expr : partitionList) { + expr.accept(this, step + 2); + out.println(); + } + } + out.println(skip(step + 1) + "ORDER BY"); + List<Expression> orderbyList = winExpr.getOrderbyList(); + List<OrderbyClause.OrderModifier> orderbyModifierList = winExpr.getOrderbyModifierList(); + for (int i = 0, ln = orderbyList.size(); i < ln; i++) { + orderbyList.get(i).accept(this, step + 2); + out.println(" " + orderbyModifierList.get(i)); + } + out.println(skip(step) + ")"); + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java index e00a3bd..0c70572 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java @@ -50,6 +50,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; import org.apache.asterix.lang.sqlpp.struct.SetOperationInput; import org.apache.asterix.lang.sqlpp.struct.SetOperationRight; import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor; @@ -411,4 +412,20 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV newCaseExpr.setSourceLocation(caseExpr.getSourceLocation()); return new Pair<>(newCaseExpr, env); } + + @Override + public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(WindowExpression winExpr, + VariableSubstitutionEnvironment env) throws CompilationException { + Expression newExpr = (Expression) winExpr.getExpr().accept(this, env).first; + List<Expression> newPartitionList = winExpr.hasPartitionList() + ? VariableCloneAndSubstitutionUtil.visitAndCloneExprList(winExpr.getPartitionList(), env, this) : null; + List<Expression> newOrderbyList = + VariableCloneAndSubstitutionUtil.visitAndCloneExprList(winExpr.getOrderbyList(), env, this); + List<OrderbyClause.OrderModifier> newOrderbyModifierList = new ArrayList<>(winExpr.getOrderbyModifierList()); + WindowExpression newWinExpr = + new WindowExpression(newExpr, newPartitionList, newOrderbyList, newOrderbyModifierList); + newWinExpr.setSourceLocation(winExpr.getSourceLocation()); + newWinExpr.addHints(winExpr.getHints()); + return new Pair<>(newWinExpr, env); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java index 755cc69..99368f8 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java @@ -44,6 +44,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; import org.apache.asterix.lang.sqlpp.struct.SetOperationRight; import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil; import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor; @@ -330,4 +331,25 @@ public class SqlppFormatPrintVisitor extends FormatPrintVisitor implements ISqlp out.print(SqlppVariableUtil.toUserDefinedName(v.getVar().getValue())); return null; } + + @Override + public Void visit(WindowExpression windowExpr, Integer step) throws CompilationException { + out.print(skip(step) + "window "); + windowExpr.getExpr().accept(this, step + 2); + out.print(skip(step) + " over ("); + if (windowExpr.hasPartitionList()) { + List<Expression> partitionList = windowExpr.getPartitionList(); + for (int i = 0, ln = partitionList.size(); i < ln; i++) { + if (i > 0) { + out.print(COMMA); + } + Expression partExpr = partitionList.get(i); + partExpr.accept(this, step + 2); + } + } + out.print(" order by "); + printDelimitedObyExpressions(windowExpr.getOrderbyList(), windowExpr.getOrderbyModifierList(), step + 2); + out.println(skip(step) + ")"); + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java index 5396768..92ebd60 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java @@ -34,6 +34,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; /** * A dummy abstract visitor to allow an implementation to only fill in necessary stuff. @@ -110,4 +111,8 @@ public abstract class AbstractSqlppAstVisitor<R, T> extends AbstractAstVisitor<R return null; } + @Override + public R visit(WindowExpression winExpr, T arg) throws CompilationException { + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java index 4d35914..a3bb592 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java @@ -61,6 +61,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; import org.apache.asterix.lang.sqlpp.struct.SetOperationRight; public class AbstractSqlppSimpleExpressionVisitor @@ -321,6 +322,16 @@ public class AbstractSqlppSimpleExpressionVisitor } @Override + public Expression visit(WindowExpression winExpr, ILangExpression arg) throws CompilationException { + winExpr.setExpr(visit(winExpr.getExpr(), arg)); + if (winExpr.hasPartitionList()) { + winExpr.setPartitionList(visit(winExpr.getPartitionList(), winExpr)); + } + winExpr.setOrderbyList(visit(winExpr.getOrderbyList(), winExpr)); + return winExpr; + } + + @Override public Expression visit(FieldAccessor fa, ILangExpression arg) throws CompilationException { fa.setExpr(visit(fa.getExpr(), fa)); return fa; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java index 02d9142..13addaf 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java @@ -34,6 +34,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; public interface ISqlppVisitor<R, T> extends ILangVisitor<R, T> { @@ -64,4 +65,6 @@ public interface ISqlppVisitor<R, T> extends ILangVisitor<R, T> { R visit(HavingClause havingClause, T arg) throws CompilationException; R visit(CaseExpression caseExpression, T arg) throws CompilationException; + + R visit(WindowExpression windowExpression, T arg) throws CompilationException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index 13d1f8d..e19ee7a 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -166,6 +166,7 @@ import org.apache.asterix.lang.sqlpp.clause.UnnestClause; import org.apache.asterix.lang.common.clause.WhereClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.expression.WindowExpression; import org.apache.asterix.lang.sqlpp.optype.JoinType; import org.apache.asterix.lang.sqlpp.optype.SetOpType; import org.apache.asterix.lang.sqlpp.struct.SetOperationInput; @@ -2588,7 +2589,7 @@ FieldBinding FieldBinding() throws ParseException: Expression FunctionCallExpr() throws ParseException: { - CallExpr callExpr; + Expression resultExpr; List<Expression> argList = new ArrayList<Expression>(); Expression tmp = null; int arity = 0; @@ -2596,6 +2597,10 @@ Expression FunctionCallExpr() throws ParseException: String hint = null; boolean star = false; boolean distinct = false; + Token overToken = null; + Expression partitionExpr = null; + List<Expression> partitionExprs = new ArrayList<Expression>(); + OrderbyClause orderByClause = null; } { funcName = FunctionName() @@ -2634,7 +2639,7 @@ Expression FunctionCallExpr() throws ParseException: if (signature == null) { signature = new FunctionSignature(funcName.dataverse, fqFunctionName, arity); } - callExpr = FunctionMapUtil.normalizedListInputFunctions(new CallExpr(signature,argList)); + CallExpr callExpr = FunctionMapUtil.normalizedListInputFunctions(new CallExpr(signature,argList)); if (hint != null) { if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) { callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE); @@ -2643,8 +2648,29 @@ Expression FunctionCallExpr() throws ParseException: } } callExpr.setSourceLocation(funcName.sourceLoc); - return callExpr; + resultExpr = callExpr; } + + ( + <OVER> { overToken = token; } + <LEFTPAREN> + ( + <PARTITION> <BY> + partitionExpr = Expression() { partitionExprs.add(partitionExpr); } + ( <COMMA> partitionExpr = Expression() { partitionExprs.add(partitionExpr); } )* + )? + orderByClause = OrderbyClause() + <RIGHTPAREN> + { + WindowExpression winExp = new WindowExpression(callExpr, partitionExprs, orderByClause.getOrderbyList(), + orderByClause.getModifierList()); + resultExpr = addSourceLocation(winExp, overToken); + } + )? + + { + return resultExpr; + } } Expression ParenthesizedExpression() throws ParseException: @@ -3393,7 +3419,9 @@ TOKEN [IGNORE_CASE]: | <ORDER : "order"> | <OUTER : "outer"> | <OUTPUT : "output"> + | <OVER: "over"> | <PATH : "path"> + | <PARTITION : "partition"> | <POLICY : "policy"> | <PRESORTED : "pre-sorted"> | <PRIMARY : "primary"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java index 3fb03a1..ccb124a 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java @@ -132,10 +132,13 @@ import org.apache.asterix.om.typecomputer.impl.UnorderedListConstructorTypeCompu import org.apache.asterix.om.types.hierachy.ATypeHierarchy; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; +import org.apache.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer; public class BuiltinFunctions { @@ -162,6 +165,9 @@ public class BuiltinFunctions { private static final Map<IFunctionInfo, IFunctionInfo> scalarToAggregateFunctionMap = new HashMap<>(); private static final Map<IFunctionInfo, IFunctionInfo> distinctToRegularScalarAggregateFunctionMap = new HashMap<>(); + private static final Map<IFunctionInfo, IFunctionInfo> builtinWindowFunctions = new HashMap<>(); + private static final Set<IFunctionInfo> builtinWindowFunctionsWithOrderArgs = new HashSet<>(); + private static final Set<IFunctionInfo> builtinWindowFunctionsWithMaterialization = new HashSet<>(); private static final Map<IFunctionInfo, SpatialFilterKind> spatialFilterFunctions = new HashMap<>(); @@ -809,6 +815,26 @@ public class BuiltinFunctions { public static final FunctionIdentifier SCALAR_SQL_VAR_POP_DISTINCT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-var_pop-distinct", 1); + // window functions + public static final FunctionIdentifier ROW_NUMBER = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "row-number", 0); + public static final FunctionIdentifier ROW_NUMBER_IMPL = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "row-number-impl", 0); + public static final FunctionIdentifier RANK = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "rank", 0); + public static final FunctionIdentifier RANK_IMPL = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "rank-impl", FunctionIdentifier.VARARGS); + public static final FunctionIdentifier DENSE_RANK = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dense-rank", 0); + public static final FunctionIdentifier DENSE_RANK_IMPL = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dense-rank-impl", FunctionIdentifier.VARARGS); + public static final FunctionIdentifier PERCENT_RANK = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "percent-rank", 0); + public static final FunctionIdentifier PERCENT_RANK_IMPL = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "percent-rank-impl", FunctionIdentifier.VARARGS); + public static final FunctionIdentifier NTILE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "ntile", 1); + public static final FunctionIdentifier NTILE_IMPL = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "ntile-impl", FunctionIdentifier.VARARGS); + // unnesting functions public static final FunctionIdentifier SCAN_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "scan-collection", 1); @@ -1727,6 +1753,19 @@ public class BuiltinFunctions { addFunction(SQL_VAR_POP_DISTINCT, NullableDoubleTypeComputer.INSTANCE, true); addFunction(SCALAR_SQL_VAR_POP_DISTINCT, NullableDoubleTypeComputer.INSTANCE, true); + // Window functions + + addFunction(ROW_NUMBER, AInt64TypeComputer.INSTANCE, true); + addPrivateFunction(ROW_NUMBER_IMPL, AInt64TypeComputer.INSTANCE, true); + addFunction(RANK, AInt64TypeComputer.INSTANCE, true); + addPrivateFunction(RANK_IMPL, AInt64TypeComputer.INSTANCE, true); + addFunction(DENSE_RANK, AInt64TypeComputer.INSTANCE, true); + addPrivateFunction(DENSE_RANK_IMPL, AInt64TypeComputer.INSTANCE, true); + addFunction(PERCENT_RANK, ADoubleTypeComputer.INSTANCE, true); + addPrivateFunction(PERCENT_RANK_IMPL, ADoubleTypeComputer.INSTANCE, true); + addFunction(NTILE, AInt64TypeComputer.INSTANCE, true); + addPrivateFunction(NTILE_IMPL, AInt64TypeComputer.INSTANCE, true); + // Similarity functions addFunction(EDIT_DISTANCE_CONTAINS, OrderedListOfAnyTypeComputer.INSTANCE, true); addFunction(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE, true); @@ -2487,6 +2526,15 @@ public class BuiltinFunctions { } static { + // Window functions + addWindowFunction(ROW_NUMBER, ROW_NUMBER_IMPL, false, false); + addWindowFunction(RANK, RANK_IMPL, true, false); + addWindowFunction(DENSE_RANK, DENSE_RANK_IMPL, true, false); + addWindowFunction(PERCENT_RANK, PERCENT_RANK_IMPL, true, true); + addWindowFunction(NTILE, NTILE_IMPL, false, true); + } + + static { addUnnestFun(RANGE, true); addUnnestFun(SCAN_COLLECTION, false); addUnnestFun(SUBSET_COLLECTION, false); @@ -2667,6 +2715,40 @@ public class BuiltinFunctions { getAsterixFunctionInfo(regularscalarfi)); } + public static void addWindowFunction(FunctionIdentifier fi, FunctionIdentifier implfi, boolean requiresOrderArgs, + boolean requiresMaterialization) { + IFunctionInfo implFinfo = getAsterixFunctionInfo(implfi); + builtinWindowFunctions.put(getAsterixFunctionInfo(fi), implFinfo); + if (requiresOrderArgs) { + builtinWindowFunctionsWithOrderArgs.add(implFinfo); + } + if (requiresMaterialization) { + builtinWindowFunctionsWithMaterialization.add(implFinfo); + } + } + + public static boolean isBuiltinWindowFunction(FunctionIdentifier fi) { + return builtinWindowFunctions.containsKey(getAsterixFunctionInfo(fi)); + } + + public static boolean windowFunctionRequiresOrderArgs(FunctionIdentifier implfi) { + return builtinWindowFunctionsWithOrderArgs.contains(getAsterixFunctionInfo(implfi)); + } + + public static boolean windowFunctionRequiresMaterialization(FunctionIdentifier implfi) { + return builtinWindowFunctionsWithMaterialization.contains(getAsterixFunctionInfo(implfi)); + } + + public static AbstractFunctionCallExpression makeWindowFunctionExpression(FunctionIdentifier scalarfi, + List<Mutable<ILogicalExpression>> args) { + IFunctionInfo finfo = getAsterixFunctionInfo(scalarfi); + IFunctionInfo implFinfo = builtinWindowFunctions.get(finfo); + if (implFinfo == null) { + throw new IllegalStateException("no implementation for window function " + finfo); + } + return new StatefulFunctionCallExpression(implFinfo, UnpartitionedPropertyComputer.INSTANCE, args); + } + static { spatialFilterFunctions.put(getAsterixFunctionInfo(BuiltinFunctions.SPATIAL_INTERSECT), SpatialFilterKind.SI); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java index 19c33db..1b0c7b6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java @@ -439,6 +439,11 @@ import org.apache.asterix.runtime.evaluators.functions.temporal.UnixTimeFromDate import org.apache.asterix.runtime.evaluators.functions.temporal.UnixTimeFromTimeInMsDescriptor; import org.apache.asterix.runtime.evaluators.functions.temporal.YearMonthDurationGreaterThanComparatorDescriptor; import org.apache.asterix.runtime.evaluators.functions.temporal.YearMonthDurationLessThanComparatorDescriptor; +import org.apache.asterix.runtime.runningaggregates.std.DenseRankRunningAggregateDescriptor; +import org.apache.asterix.runtime.runningaggregates.std.NtileRunningAggregateDescriptor; +import org.apache.asterix.runtime.runningaggregates.std.PercentRankRunningAggregateDescriptor; +import org.apache.asterix.runtime.runningaggregates.std.RankRunningAggregateDescriptor; +import org.apache.asterix.runtime.runningaggregates.std.RowNumberRunningAggregateDescriptor; import org.apache.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor; import org.apache.asterix.runtime.unnestingfunctions.std.RangeDescriptor; import org.apache.asterix.runtime.unnestingfunctions.std.ScanCollectionDescriptor; @@ -632,6 +637,13 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(ScalarSqlVarAggregateDescriptor.FACTORY); fc.add(ScalarSqlVarPopAggregateDescriptor.FACTORY); + // window functions + fc.add(RowNumberRunningAggregateDescriptor.FACTORY); + fc.add(RankRunningAggregateDescriptor.FACTORY); + fc.add(DenseRankRunningAggregateDescriptor.FACTORY); + fc.add(PercentRankRunningAggregateDescriptor.FACTORY); + fc.add(NtileRunningAggregateDescriptor.FACTORY); + // boolean functions fc.add(AndDescriptor.FACTORY); fc.add(OrDescriptor.FACTORY); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java new file mode 100644 index 0000000..54bcc1a --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import java.io.DataOutput; + +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +/** + * Base evaluator implementation for ranking window functions: + * {@code rank()}, {@code dense_rank()}, {@code percent_rank()} + */ +public abstract class AbstractRankRunningAggregateEvaluator implements IWindowAggregateEvaluator { + + private final IScalarEvaluator[] args; + + private final ArrayBackedValueStorage[] argPrevValues; + + private final IPointable[] argCurrValues; + + private final boolean dense; + + protected final SourceLocation sourceLoc; + + private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); + + private IBinaryComparator[] argComparators; + + private boolean first; + + private long rank; + + private long groupSize; + + AbstractRankRunningAggregateEvaluator(IScalarEvaluator[] args, boolean dense, SourceLocation sourceLoc) { + this.args = args; + this.dense = dense; + this.sourceLoc = sourceLoc; + argPrevValues = new ArrayBackedValueStorage[args.length]; + argCurrValues = new IPointable[args.length]; + for (int i = 0; i < args.length; i++) { + argPrevValues[i] = new ArrayBackedValueStorage(); + argCurrValues[i] = VoidPointable.FACTORY.createPointable(); + } + } + + @Override + public void configure(IBinaryComparator[] orderComparators) { + argComparators = orderComparators; + } + + @Override + public void init() throws HyracksDataException { + } + + @Override + public void initPartition(long partitionLength) { + first = true; + } + + @Override + public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + for (int i = 0; i < args.length; i++) { + args[i].evaluate(tuple, argCurrValues[i]); + } + + computeRank(); + + for (int i = 0; i < args.length; i++) { + argPrevValues[i].assign(argCurrValues[i]); + } + + resultStorage.reset(); + writeResult(rank, resultStorage.getDataOutput()); + result.set(resultStorage); + } + + protected abstract void writeResult(long rank, DataOutput out) throws HyracksDataException; + + private void computeRank() throws HyracksDataException { + if (first) { + rank = 1; + groupSize = 1; + first = false; + } else if (sameGroup()) { + groupSize++; + } else { + rank += dense ? 1 : groupSize; + groupSize = 1; + } + } + + private boolean sameGroup() throws HyracksDataException { + for (int i = 0; i < args.length; i++) { + IPointable v1 = argPrevValues[i]; + IPointable v2 = argCurrValues[i]; + IBinaryComparator cmp = argComparators[i]; + if (cmp.compare(v1.getByteArray(), v1.getStartOffset(), v1.getLength(), v2.getByteArray(), + v2.getStartOffset(), v2.getLength()) != 0) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java new file mode 100644 index 0000000..6e51559 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * Descriptor {@code dense_rank()} window function + */ +public class DenseRankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = DenseRankRunningAggregateDescriptor::new; + + @Override + public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + return new IRunningAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + IScalarEvaluator[] evals = new IScalarEvaluator[args.length]; + for (int i = 0; i < args.length; i++) { + evals[i] = args[i].createScalarEvaluator(ctx); + } + return new RankRunningAggregateEvaluator(evals, true, sourceLoc); + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.DENSE_RANK_IMPL; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java new file mode 100644 index 0000000..5157451 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * Descriptor {@code ntile()} window function + */ +public class NtileRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = NtileRunningAggregateDescriptor::new; + + @Override + public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + return new IRunningAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + return new NtileRunningAggregateEvaluator(args[0].createScalarEvaluator(ctx), getIdentifier()); + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.NTILE_IMPL; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java new file mode 100644 index 0000000..9bd306e --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +/** + * Evaluator {@code ntile()} window function + */ +public class NtileRunningAggregateEvaluator implements IWindowAggregateEvaluator { + + private final IScalarEvaluator evalNumGroups; + + private final VoidPointable argNumGroups = VoidPointable.FACTORY.createPointable(); + + private final FunctionIdentifier funId; + + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<AInt64> serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + + private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); + + private final AMutableInt64 aInt64 = new AMutableInt64(0); + + private long partitionLength; + + private long groupSize; + + private long groupRemainder; + + private long resultValue; + + private long count; + + NtileRunningAggregateEvaluator(IScalarEvaluator evalNumGroups, FunctionIdentifier funId) { + this.evalNumGroups = evalNumGroups; + this.funId = funId; + } + + @Override + public void init() throws HyracksDataException { + } + + @Override + public void initPartition(long partitionLength) { + this.partitionLength = partitionLength; + resultValue = 0; + } + + @Override + public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + if (resultValue == 0) { + evaluateGroupSize(tuple); + resultValue = count = 1; + } else if (count < groupSize) { + count++; + } else if (count == groupSize && groupRemainder > 0) { + groupRemainder--; + count++; + } else { + resultValue++; + count = 1; + } + + resultStorage.reset(); + aInt64.setValue(resultValue); + serde.serialize(aInt64, resultStorage.getDataOutput()); + result.set(resultStorage); + } + + private void evaluateGroupSize(IFrameTupleReference tuple) throws HyracksDataException { + evalNumGroups.evaluate(tuple, argNumGroups); + byte[] bytes = argNumGroups.getByteArray(); + int offset = argNumGroups.getStartOffset(); + long numGroups = ATypeHierarchy.getLongValue(funId.getName(), 0, bytes, offset); + if (numGroups > partitionLength || numGroups <= 0) { + groupSize = partitionLength; + groupRemainder = 0; + } else { + groupSize = partitionLength / numGroups; + groupRemainder = partitionLength % numGroups; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java new file mode 100644 index 0000000..180ca99 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * Descriptor {@code percent_rank()} window function + */ +public class PercentRankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = PercentRankRunningAggregateDescriptor::new; + + @Override + public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + return new IRunningAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + IScalarEvaluator[] evals = new IScalarEvaluator[args.length]; + for (int i = 0; i < args.length; i++) { + evals[i] = args[i].createScalarEvaluator(ctx); + } + return new PercentRankRunningAggregateEvaluator(evals, sourceLoc); + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.PERCENT_RANK_IMPL; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java new file mode 100644 index 0000000..c73d9fd --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import java.io.DataOutput; + +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.ADouble; +import org.apache.asterix.om.base.AMutableDouble; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; + +/** + * Evaluator {@code percent_rank()} window function + */ +class PercentRankRunningAggregateEvaluator extends AbstractRankRunningAggregateEvaluator { + + private final AMutableDouble aDouble = new AMutableDouble(0); + + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<ADouble> serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + + private double divisor; + + PercentRankRunningAggregateEvaluator(IScalarEvaluator[] args, SourceLocation sourceLoc) { + super(args, false, sourceLoc); + } + + @Override + public void initPartition(long partitionLength) { + super.initPartition(partitionLength); + divisor = (double) partitionLength - 1; + } + + @Override + protected void writeResult(long rank, DataOutput out) throws HyracksDataException { + double percentRank = (rank - 1) / divisor; + aDouble.setValue(percentRank); + serde.serialize(aDouble, out); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java new file mode 100644 index 0000000..d5db134 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * Descriptor {@code rank()} window function + */ +public class RankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = RankRunningAggregateDescriptor::new; + + @Override + public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + return new IRunningAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + IScalarEvaluator[] evals = new IScalarEvaluator[args.length]; + for (int i = 0; i < args.length; i++) { + evals[i] = args[i].createScalarEvaluator(ctx); + } + return new RankRunningAggregateEvaluator(evals, false, sourceLoc); + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.RANK_IMPL; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java new file mode 100644 index 0000000..56ab299 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import java.io.DataOutput; + +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; + +/** + * Evaluator for {@code rank()} and {@code dense_rank()} window functions + */ +class RankRunningAggregateEvaluator extends AbstractRankRunningAggregateEvaluator { + + private final AMutableInt64 aInt64 = new AMutableInt64(0); + + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<AInt64> serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + + RankRunningAggregateEvaluator(IScalarEvaluator[] args, boolean dense, SourceLocation sourceLoc) { + super(args, dense, sourceLoc); + } + + protected void writeResult(long rank, DataOutput out) throws HyracksDataException { + aInt64.setValue(rank); + serde.serialize(aInt64, out); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java new file mode 100644 index 0000000..7464751 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; + +/** + * Descriptor {@code row_number()} window function + */ +public class RowNumberRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = RowNumberRunningAggregateDescriptor::new; + + @Override + public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + return new IRunningAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) { + return new RowNumberRunningAggregateEvaluator(); + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ROW_NUMBER_IMPL; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java new file mode 100644 index 0000000..75fface --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.runningaggregates.std; + +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +/** + * Evaluator for {@code row_number()} window function + */ +class RowNumberRunningAggregateEvaluator implements IWindowAggregateEvaluator { + + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<AInt64> serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + + private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); + + private final AMutableInt64 aInt64 = new AMutableInt64(0); + + private long resultValue; + + @Override + public void init() { + // nothing to do + } + + @Override + public void initPartition(long partitionLength) { + resultValue = 0; + } + + @Override + public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + resultValue++; + + resultStorage.reset(); + aInt64.setValue(resultValue); + serde.serialize(aInt64, resultStorage.getDataOutput()); + result.set(resultStorage); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java index 2dd71cd..a0d5880 100644 --- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java +++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java @@ -33,6 +33,22 @@ public class Pair<T1, T2> implements Serializable { this.second = second; } + public void setFirst(T1 value) { + first = value; + } + + public T1 getFirst() { + return first; + } + + public void setSecond(T2 value) { + second = value; + } + + public T2 getSecond() { + return second; + } + @Override public String toString() { return first + "," + second; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java index 6bd0d02..d996caf 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java @@ -89,7 +89,7 @@ public interface ILogicalOperator { */ public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties, - IOptimizationContext context); + IOptimizationContext context) throws AlgebricksException; /** * @return the physical properties that this operator delivers, based on http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java index 2a92aba..a88ec64 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java @@ -40,7 +40,7 @@ public interface IPhysicalOperator { * @return for each child, one vector of required physical properties */ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context); + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException; /** * @return the physical properties that this operator delivers, based on http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java index 3794328..4466408 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java @@ -56,4 +56,5 @@ public enum LogicalOperatorTag { WRITE, WRITE_RESULT, INTERSECT, + WINDOW } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index ac1de5a..5d19134 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -79,4 +79,5 @@ public enum PhysicalOperatorTag { UPDATE, WRITE_RESULT, INTERSECT, + WINDOW } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java index 51040fd..e15ed92 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; /** @@ -71,4 +72,17 @@ public abstract class AbstractAssignOperator extends AbstractLogicalOperator { return modif; } + protected VariablePropagationPolicy createVariablePropagationPolicy(boolean propagateInputVars) { + return new VariablePropagationPolicy() { + @Override + public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) { + if (propagateInputVars) { + target.addAllVariables(sources[0]); + } + for (LogicalVariable v : variables) { + target.addVariable(v); + } + } + }; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java index 1dbf15e..d4a9d37 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java @@ -115,7 +115,7 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator { @Override public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren( - IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) { + IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) throws AlgebricksException { return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java index b4a59a8..35cb087 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java @@ -57,16 +57,7 @@ public class AggregateOperator extends AbstractAssignOperator { @Override public VariablePropagationPolicy getVariablePropagationPolicy() { - return new VariablePropagationPolicy() { - - @Override - public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) - throws AlgebricksException { - for (LogicalVariable v : variables) { - target.addVariable(v); - } - } - }; + return createVariablePropagationPolicy(false); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java index 861d74c..202c291 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java @@ -65,18 +65,7 @@ public class AssignOperator extends AbstractAssignOperator { @Override public VariablePropagationPolicy getVariablePropagationPolicy() { - return new VariablePropagationPolicy() { - - @Override - public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) - throws AlgebricksException { - target.addAllVariables(sources[0]); - for (LogicalVariable v : variables) { - target.addVariable(v); - } - } - }; - + return createVariablePropagationPolicy(true); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java index ef16613..8a710cc 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java @@ -49,17 +49,7 @@ public class RunningAggregateOperator extends AbstractAssignOperator { @Override public VariablePropagationPolicy getVariablePropagationPolicy() { - return new VariablePropagationPolicy() { - - @Override - public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) - throws AlgebricksException { - target.addAllVariables(sources[0]); - for (LogicalVariable v : variables) { - target.addVariable(v); - } - } - }; + return createVariablePropagationPolicy(true); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java new file mode 100644 index 0000000..aa1791f --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.logical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; +import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; + +/** + * Window operator evaluates window functions. It has the following components: + * <ul> + * <li>{@link #partitionExpressions} - define how input data must be partitioned</li> + * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li> + * <li>{@link #expressions} - window function expressions (running aggregates)</li> + * <li>{@link #variables} - output variables containing return values of these functions</li> + * </ul> + * + * Window operator does not change cardinality of the input stream. + */ +public class WindowOperator extends AbstractAssignOperator { + + private final List<Mutable<ILogicalExpression>> partitionExpressions; + + private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions; + + public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions, + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions, + List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) { + super(variables, expressions); + this.partitionExpressions = new ArrayList<>(); + if (partitionExpressions != null) { + this.partitionExpressions.addAll(partitionExpressions); + } + this.orderExpressions = new ArrayList<>(); + if (orderExpressions != null) { + this.orderExpressions.addAll(orderExpressions); + } + } + + @Override + public LogicalOperatorTag getOperatorTag() { + return LogicalOperatorTag.WINDOW; + } + + public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getOrderExpressions() { + return orderExpressions; + } + + public List<Mutable<ILogicalExpression>> getPartitionExpressions() { + return partitionExpressions; + } + + @Override + public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException { + return visitor.visitWindowOperator(this, arg); + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { + boolean mod = super.acceptExpressionTransform(visitor); + for (Mutable<ILogicalExpression> expr : partitionExpressions) { + mod |= visitor.transform(expr); + } + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) { + mod |= visitor.transform(p.second); + } + return mod; + } + + @Override + public VariablePropagationPolicy getVariablePropagationPolicy() { + return createVariablePropagationPolicy(true); + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { + IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx); + int n = variables.size(); + for (int i = 0; i < n; i++) { + env.setVarType(variables.get(i), ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(), + ctx.getMetadataProvider(), env)); + } + return env; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java index 9d853eb..8535204 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java @@ -61,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -160,6 +161,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long } @Override + public Long visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException { + return op.getInputs().get(0).getValue().accept(this, arg); + } + + @Override public Long visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException { return adjustCardinalityForTupleReductionOperator(op.getInputs().get(0).getValue().accept(this, arg)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java index 16fc1ed..7042794 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java @@ -78,6 +78,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; @@ -812,4 +813,9 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void, return null; } + @Override + public Void visitWindowOperator(WindowOperator op, IOptimizationContext ctx) throws AlgebricksException { + propagateFDsAndEquivClasses(op, ctx); + return null; + } }
