http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java deleted file mode 100644 index 9abd82d..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java +++ /dev/null @@ -1,839 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.AnalyticWindow.Boundary; -import com.cloudera.impala.analysis.AnalyticWindow.BoundaryType; -import com.cloudera.impala.catalog.AggregateFunction; -import com.cloudera.impala.catalog.Function; -import com.cloudera.impala.catalog.ScalarType; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.common.TreeNode; -import com.cloudera.impala.service.FeSupport; -import com.cloudera.impala.thrift.TColumnValue; -import com.cloudera.impala.thrift.TExprNode; -import com.cloudera.impala.util.TColumnValueUtil; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Representation of an analytic function call with OVER clause. - * All "subexpressions" (such as the actual function call parameters as well as the - * partition/ordering exprs, etc.) are embedded as children in order to allow expr - * substitution: - * function call params: child 0 .. #params - * partition exprs: children #params + 1 .. #params + #partition-exprs - * ordering exprs: - * children #params + #partition-exprs + 1 .. - * #params + #partition-exprs + #order-by-elements - * exprs in windowing clause: remaining children - * - * Note that it's wrong to embed the FunctionCallExpr itself as a child, - * because in 'COUNT(..) OVER (..)' the 'COUNT(..)' is not part of a standard aggregate - * computation and must not be substituted as such. However, the parameters of the - * analytic function call might reference the output of an aggregate computation - * and need to be substituted as such; example: COUNT(COUNT(..)) OVER (..) - */ -public class AnalyticExpr extends Expr { - private final static Logger LOG = LoggerFactory.getLogger(AnalyticExpr.class); - - private FunctionCallExpr fnCall_; - private final List<Expr> partitionExprs_; - // These elements are modified to point to the corresponding child exprs to keep them - // in sync through expr substitutions. - private List<OrderByElement> orderByElements_ = Lists.newArrayList(); - private AnalyticWindow window_; - - // If set, requires the window to be set to null in resetAnalysisState(). Required for - // proper substitution/cloning because standardization may set a window that is illegal - // in SQL, and hence, will fail analysis(). - private boolean resetWindow_ = false; - - // SQL string of this AnalyticExpr before standardization. Returned in toSqlImpl(). - private String sqlString_; - - private static String LEAD = "lead"; - private static String LAG = "lag"; - private static String FIRST_VALUE = "first_value"; - private static String LAST_VALUE = "last_value"; - private static String FIRST_VALUE_IGNORE_NULLS = "first_value_ignore_nulls"; - private static String LAST_VALUE_IGNORE_NULLS = "last_value_ignore_nulls"; - private static String RANK = "rank"; - private static String DENSERANK = "dense_rank"; - private static String ROWNUMBER = "row_number"; - private static String MIN = "min"; - private static String MAX = "max"; - private static String PERCENT_RANK = "percent_rank"; - private static String CUME_DIST = "cume_dist"; - private static String NTILE = "ntile"; - - // Internal function used to implement FIRST_VALUE with a window rewrite and - // additional null handling in the backend. - public static String FIRST_VALUE_REWRITE = "first_value_rewrite"; - - public AnalyticExpr(FunctionCallExpr fnCall, List<Expr> partitionExprs, - List<OrderByElement> orderByElements, AnalyticWindow window) { - Preconditions.checkNotNull(fnCall); - fnCall_ = fnCall; - partitionExprs_ = partitionExprs != null ? partitionExprs : new ArrayList<Expr>(); - if (orderByElements != null) orderByElements_.addAll(orderByElements); - window_ = window; - setChildren(); - } - - /** - * clone() c'tor - */ - protected AnalyticExpr(AnalyticExpr other) { - super(other); - fnCall_ = (FunctionCallExpr) other.fnCall_.clone(); - for (OrderByElement e: other.orderByElements_) { - orderByElements_.add(e.clone()); - } - partitionExprs_ = Expr.cloneList(other.partitionExprs_); - window_ = (other.window_ != null ? other.window_.clone() : null); - resetWindow_ = other.resetWindow_; - sqlString_ = other.sqlString_; - setChildren(); - } - - public FunctionCallExpr getFnCall() { return fnCall_; } - public List<Expr> getPartitionExprs() { return partitionExprs_; } - public List<OrderByElement> getOrderByElements() { return orderByElements_; } - public AnalyticWindow getWindow() { return window_; } - - @Override - public boolean equals(Object obj) { - if (!super.equals(obj)) return false; - AnalyticExpr o = (AnalyticExpr)obj; - if (!fnCall_.equals(o.getFnCall())) return false; - if ((window_ == null) != (o.window_ == null)) return false; - if (window_ != null) { - if (!window_.equals(o.window_)) return false; - } - return orderByElements_.equals(o.orderByElements_); - } - - /** - * Analytic exprs cannot be constant. - */ - @Override - public boolean isConstant() { return false; } - - @Override - public Expr clone() { return new AnalyticExpr(this); } - - @Override - public String toSqlImpl() { - if (sqlString_ != null) return sqlString_; - StringBuilder sb = new StringBuilder(); - sb.append(fnCall_.toSql()).append(" OVER ("); - boolean needsSpace = false; - if (!partitionExprs_.isEmpty()) { - sb.append("PARTITION BY ").append(Expr.toSql(partitionExprs_)); - needsSpace = true; - } - if (!orderByElements_.isEmpty()) { - List<String> orderByStrings = Lists.newArrayList(); - for (OrderByElement e: orderByElements_) { - orderByStrings.add(e.toSql()); - } - if (needsSpace) sb.append(" "); - sb.append("ORDER BY ").append(Joiner.on(", ").join(orderByStrings)); - needsSpace = true; - } - if (window_ != null) { - if (needsSpace) sb.append(" "); - sb.append(window_.toSql()); - } - sb.append(")"); - return sb.toString(); - } - - @Override - public String debugString() { - return Objects.toStringHelper(this) - .add("fn", getFnCall()) - .add("window", window_) - .addValue(super.debugString()) - .toString(); - } - - @Override - protected void toThrift(TExprNode msg) { - } - - private static boolean isAnalyticFn(Function fn) { - return fn instanceof AggregateFunction - && ((AggregateFunction) fn).isAnalyticFn(); - } - - private static boolean isAnalyticFn(Function fn, String fnName) { - return isAnalyticFn(fn) && fn.functionName().equals(fnName); - } - - public static boolean isAggregateFn(Function fn) { - return fn instanceof AggregateFunction - && ((AggregateFunction) fn).isAggregateFn(); - } - - public static boolean isPercentRankFn(Function fn) { - return isAnalyticFn(fn, PERCENT_RANK); - } - - public static boolean isCumeDistFn(Function fn) { - return isAnalyticFn(fn, CUME_DIST); - } - - public static boolean isNtileFn(Function fn) { - return isAnalyticFn(fn, NTILE); - } - - static private boolean isOffsetFn(Function fn) { - return isAnalyticFn(fn, LEAD) || isAnalyticFn(fn, LAG); - } - - static private boolean isMinMax(Function fn) { - return isAnalyticFn(fn, MIN) || isAnalyticFn(fn, MAX); - } - - static private boolean isRankingFn(Function fn) { - return isAnalyticFn(fn, RANK) || isAnalyticFn(fn, DENSERANK) || - isAnalyticFn(fn, ROWNUMBER); - } - - /** - * Rewrite the following analytic functions: - * percent_rank(), cume_dist() and ntile() - * - * Returns a new Expr if the analytic expr is rewritten, returns null if it's not one - * that we want to rewrite. - */ - public static Expr rewrite(AnalyticExpr analyticExpr) { - Function fn = analyticExpr.getFnCall().getFn(); - if (AnalyticExpr.isPercentRankFn(fn)) { - return createPercentRank(analyticExpr); - } else if (AnalyticExpr.isCumeDistFn(fn)) { - return createCumeDist(analyticExpr); - } else if (AnalyticExpr.isNtileFn(fn)) { - return createNtile(analyticExpr); - } - return null; - } - - /** - * Rewrite percent_rank() to the following: - * - * percent_rank() over([partition by clause] order by clause) - * = (Count == 1) ? 0:(Rank - 1)/(Count - 1) - * where, - * Rank = rank() over([partition by clause] order by clause) - * Count = count() over([partition by clause]) - */ - private static Expr createPercentRank(AnalyticExpr analyticExpr) { - Preconditions.checkState( - AnalyticExpr.isPercentRankFn(analyticExpr.getFnCall().getFn())); - - NumericLiteral zero = new NumericLiteral(BigInteger.valueOf(0), ScalarType.BIGINT); - NumericLiteral one = new NumericLiteral(BigInteger.valueOf(1), ScalarType.BIGINT); - AnalyticExpr countExpr = create("count", analyticExpr, false, false); - AnalyticExpr rankExpr = create("rank", analyticExpr, true, false); - - ArithmeticExpr arithmeticRewrite = - new ArithmeticExpr(ArithmeticExpr.Operator.DIVIDE, - new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, rankExpr, one), - new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, countExpr, one)); - - List<Expr> ifParams = Lists.newArrayList(); - ifParams.add( - new BinaryPredicate(BinaryPredicate.Operator.EQ, one, countExpr)); - ifParams.add(zero); - ifParams.add(arithmeticRewrite); - FunctionCallExpr resultantRewrite = new FunctionCallExpr("if", ifParams); - - return resultantRewrite; - } - - /** - * Rewrite cume_dist() to the following: - * - * cume_dist() over([partition by clause] order by clause) - * = ((Count - Rank) + 1)/Count - * where, - * Rank = rank() over([partition by clause] order by clause DESC) - * Count = count() over([partition by clause]) - */ - private static Expr createCumeDist(AnalyticExpr analyticExpr) { - Preconditions.checkState( - AnalyticExpr.isCumeDistFn(analyticExpr.getFnCall().getFn())); - AnalyticExpr rankExpr = create("rank", analyticExpr, true, true); - AnalyticExpr countExpr = create("count", analyticExpr, false, false); - NumericLiteral one = new NumericLiteral(BigInteger.valueOf(1), ScalarType.BIGINT); - ArithmeticExpr arithmeticRewrite = - new ArithmeticExpr(ArithmeticExpr.Operator.DIVIDE, - new ArithmeticExpr(ArithmeticExpr.Operator.ADD, - new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, countExpr, rankExpr), - one), - countExpr); - return arithmeticRewrite; - } - - /** - * Rewrite ntile() to the following: - * - * ntile(B) over([partition by clause] order by clause) - * = floor(min(Count, B) * (RowNumber - 1)/Count) + 1 - * where, - * RowNumber = row_number() over([partition by clause] order by clause) - * Count = count() over([partition by clause]) - */ - private static Expr createNtile(AnalyticExpr analyticExpr) { - Preconditions.checkState( - AnalyticExpr.isNtileFn(analyticExpr.getFnCall().getFn())); - Expr bucketExpr = analyticExpr.getChild(0); - AnalyticExpr rowNumExpr = create("row_number", analyticExpr, true, false); - AnalyticExpr countExpr = create("count", analyticExpr, false, false); - - List<Expr> ifParams = Lists.newArrayList(); - ifParams.add( - new BinaryPredicate(BinaryPredicate.Operator.LT, bucketExpr, countExpr)); - ifParams.add(bucketExpr); - ifParams.add(countExpr); - - NumericLiteral one = new NumericLiteral(BigInteger.valueOf(1), ScalarType.BIGINT); - ArithmeticExpr minMultiplyRowMinusOne = - new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, - new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, rowNumExpr, one), - new FunctionCallExpr("if", ifParams)); - ArithmeticExpr divideAddOne = - new ArithmeticExpr(ArithmeticExpr.Operator.ADD, - new ArithmeticExpr(ArithmeticExpr.Operator.INT_DIVIDE, - minMultiplyRowMinusOne, countExpr), - one); - return divideAddOne; - } - - /** - * Create a new Analytic Expr and associate it with a new function. - * Takes a reference analytic expression and clones the partition expressions and the - * order by expressions if 'copyOrderBy' is set and optionally reverses it if - * 'reverseOrderBy' is set. The new function that it will be associated with is - * specified by fnName. - */ - private static AnalyticExpr create(String fnName, - AnalyticExpr referenceExpr, boolean copyOrderBy, boolean reverseOrderBy) { - FunctionCallExpr fnExpr = new FunctionCallExpr(fnName, new ArrayList<Expr>()); - fnExpr.setIsAnalyticFnCall(true); - List<OrderByElement> orderByElements = null; - if (copyOrderBy) { - if (reverseOrderBy) { - orderByElements = OrderByElement.reverse(referenceExpr.getOrderByElements()); - } else { - orderByElements = Lists.newArrayList(); - for (OrderByElement elem: referenceExpr.getOrderByElements()) { - orderByElements.add(elem.clone()); - } - } - } - AnalyticExpr analyticExpr = new AnalyticExpr(fnExpr, - Expr.cloneList(referenceExpr.getPartitionExprs()), orderByElements, null); - return analyticExpr; - } - - /** - * Checks that the value expr of an offset boundary of a RANGE window is compatible - * with orderingExprs (and that there's only a single ordering expr). - */ - private void checkRangeOffsetBoundaryExpr(AnalyticWindow.Boundary boundary) - throws AnalysisException { - Preconditions.checkState(boundary.getType().isOffset()); - if (orderByElements_.size() > 1) { - throw new AnalysisException("Only one ORDER BY expression allowed if used with " - + "a RANGE window with PRECEDING/FOLLOWING: " + toSql()); - } - Expr rangeExpr = boundary.getExpr(); - if (!Type.isImplicitlyCastable( - rangeExpr.getType(), orderByElements_.get(0).getExpr().getType(), false)) { - throw new AnalysisException( - "The value expression of a PRECEDING/FOLLOWING clause of a RANGE window must " - + "be implicitly convertable to the ORDER BY expression's type: " - + rangeExpr.toSql() + " cannot be implicitly converted to " - + orderByElements_.get(0).getExpr().getType().toSql()); - } - } - - /** - * Checks offset of lag()/lead(). - */ - void checkOffset(Analyzer analyzer) throws AnalysisException { - Preconditions.checkState(isOffsetFn(getFnCall().getFn())); - Preconditions.checkState(getFnCall().getChildren().size() > 1); - Expr offset = getFnCall().getChild(1); - Preconditions.checkState(offset.getType().isIntegerType()); - boolean isPosConstant = true; - if (!offset.isConstant()) { - isPosConstant = false; - } else { - try { - TColumnValue val = FeSupport.EvalConstExpr(offset, analyzer.getQueryCtx()); - if (TColumnValueUtil.getNumericVal(val) <= 0) isPosConstant = false; - } catch (InternalException exc) { - throw new AnalysisException( - "Couldn't evaluate LEAD/LAG offset: " + exc.getMessage()); - } - } - if (!isPosConstant) { - throw new AnalysisException( - "The offset parameter of LEAD/LAG must be a constant positive integer: " - + getFnCall().toSql()); - } - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - if (isAnalyzed_) return; - fnCall_.analyze(analyzer); - super.analyze(analyzer); - type_ = getFnCall().getType(); - - for (Expr e: partitionExprs_) { - if (e.isConstant()) { - throw new AnalysisException( - "Expressions in the PARTITION BY clause must not be constant: " - + e.toSql() + " (in " + toSql() + ")"); - } else if (e.getType().isComplexType()) { - throw new AnalysisException(String.format("PARTITION BY expression '%s' with " + - "complex type '%s' is not supported.", e.toSql(), - e.getType().toSql())); - } - } - for (OrderByElement e: orderByElements_) { - if (e.getExpr().isConstant()) { - throw new AnalysisException( - "Expressions in the ORDER BY clause must not be constant: " - + e.getExpr().toSql() + " (in " + toSql() + ")"); - } else if (e.getExpr().getType().isComplexType()) { - throw new AnalysisException(String.format("ORDER BY expression '%s' with " + - "complex type '%s' is not supported.", e.getExpr().toSql(), - e.getExpr().getType().toSql())); - } - } - - if (getFnCall().getParams().isDistinct()) { - throw new AnalysisException( - "DISTINCT not allowed in analytic function: " + getFnCall().toSql()); - } - - if (getFnCall().getParams().isIgnoreNulls()) { - String fnName = getFnCall().getFnName().getFunction(); - if (!fnName.equals(LAST_VALUE) && !fnName.equals(FIRST_VALUE)) { - throw new AnalysisException("Function " + fnName.toUpperCase() - + " does not accept the keyword IGNORE NULLS."); - } - } - - // check for correct composition of analytic expr - Function fn = getFnCall().getFn(); - if (!(fn instanceof AggregateFunction)) { - throw new AnalysisException( - "OVER clause requires aggregate or analytic function: " - + getFnCall().toSql()); - } - - // check for non-analytic aggregate functions - if (!isAnalyticFn(fn)) { - throw new AnalysisException( - String.format("Aggregate function '%s' not supported with OVER clause.", - getFnCall().toSql())); - } - - if (isAnalyticFn(fn) && !isAggregateFn(fn)) { - if (orderByElements_.isEmpty()) { - throw new AnalysisException( - "'" + getFnCall().toSql() + "' requires an ORDER BY clause"); - } - if ((isRankingFn(fn) || isOffsetFn(fn)) && window_ != null) { - throw new AnalysisException( - "Windowing clause not allowed with '" + getFnCall().toSql() + "'"); - } - if (isOffsetFn(fn) && getFnCall().getChildren().size() > 1) { - checkOffset(analyzer); - // check the default, which needs to be a constant at the moment - // TODO: remove this check when the backend can handle non-constants - if (getFnCall().getChildren().size() > 2) { - if (!getFnCall().getChild(2).isConstant()) { - throw new AnalysisException( - "The default parameter (parameter 3) of LEAD/LAG must be a constant: " - + getFnCall().toSql()); - } - } - } - if (isNtileFn(fn)) { - // TODO: IMPALA-2171:Remove this when ntile() can handle a non-constant argument. - if (!getFnCall().getChild(0).isConstant()) { - throw new AnalysisException("NTILE() requires a constant argument"); - } - // Check if argument value is zero or negative and throw an exception if found. - try { - TColumnValue bucketValue = - FeSupport.EvalConstExpr(getFnCall().getChild(0), analyzer.getQueryCtx()); - Long arg = bucketValue.getLong_val(); - if (arg <= 0) { - throw new AnalysisException("NTILE() requires a positive argument: " + arg); - } - } catch (InternalException e) { - throw new AnalysisException(e.toString()); - } - } - } - - if (window_ != null) { - if (orderByElements_.isEmpty()) { - throw new AnalysisException("Windowing clause requires ORDER BY clause: " - + toSql()); - } - window_.analyze(analyzer); - - if (!orderByElements_.isEmpty() - && window_.getType() == AnalyticWindow.Type.RANGE) { - // check that preceding/following ranges match ordering - if (window_.getLeftBoundary().getType().isOffset()) { - checkRangeOffsetBoundaryExpr(window_.getLeftBoundary()); - } - if (window_.getRightBoundary() != null - && window_.getRightBoundary().getType().isOffset()) { - checkRangeOffsetBoundaryExpr(window_.getRightBoundary()); - } - } - } - - // check nesting - if (TreeNode.contains(getChildren(), AnalyticExpr.class)) { - throw new AnalysisException( - "Nesting of analytic expressions is not allowed: " + toSql()); - } - sqlString_ = toSql(); - - standardize(analyzer); - - // min/max is not currently supported on sliding windows (i.e. start bound is not - // unbounded). - if (window_ != null && isMinMax(fn) && - window_.getLeftBoundary().getType() != BoundaryType.UNBOUNDED_PRECEDING) { - throw new AnalysisException( - "'" + getFnCall().toSql() + "' is only supported with an " - + "UNBOUNDED PRECEDING start bound."); - } - - setChildren(); - } - - /** - * If necessary, rewrites the analytic function, window, and/or order-by elements into - * a standard format for the purpose of simpler backend execution, as follows: - * 1. row_number(): - * Set a window from UNBOUNDED PRECEDING to CURRENT_ROW. - * 2. lead()/lag(): - * Explicitly set the default arguments to for BE simplicity. - * Set a window for lead(): UNBOUNDED PRECEDING to OFFSET FOLLOWING. - * Set a window for lag(): UNBOUNDED PRECEDING to OFFSET PRECEDING. - * 3. FIRST_VALUE without UNBOUNDED PRECEDING or IGNORE NULLS gets rewritten to use a - * different window and function. There are a few cases: - * a) Start bound is X FOLLOWING or CURRENT ROW (X=0): - * Use 'last_value' with a window where both bounds are X FOLLOWING (or - * CURRENT ROW). Setting the start bound to X following is necessary because the - * X rows at the end of a partition have no rows in their window. Note that X - * FOLLOWING could be rewritten as lead(X) but that would not work for CURRENT - * ROW. - * b) Start bound is X PRECEDING and end bound is CURRENT ROW or FOLLOWING: - * Use 'first_value_rewrite' and a window with an end bound X PRECEDING. An - * extra parameter '-1' is added to indicate to the backend that NULLs should - * not be added for the first X rows. - * c) Start bound is X PRECEDING and end bound is Y PRECEDING: - * Use 'first_value_rewrite' and a window with an end bound X PRECEDING. The - * first Y rows in a partition have empty windows and should be NULL. An extra - * parameter with the integer constant Y is added to indicate to the backend - * that NULLs should be added for the first Y rows. - * The performance optimization here and in 5. below cannot be applied in the case of - * IGNORE NULLS because they change what values appear in the window, which in the - * IGNORE NULLS case could mean the correct value to return isn't even in the window, - * eg. if all of the values in the rewritten window are NULL but one of the values in - * the original window isn't. - * 4. Start bound is not UNBOUNDED PRECEDING and either the end bound is UNBOUNDED - * FOLLOWING or the function is first_value(... ignore nulls): - * Reverse the ordering and window, and flip first_value() and last_value(). - * 5. first_value() with UNBOUNDED PRECEDING and not IGNORE NULLS: - * Set the end boundary to CURRENT_ROW. - * 6. Rewrite IGNORE NULLS as regular FunctionCallExprs with '_ignore_nulls' - * appended to the function name, because the BE implements them as different - * functions. - * 7. Explicitly set the default window if no window was given but there - * are order-by elements. - * 8. first/last_value() with RANGE window: - * Rewrite as a ROWS window. - */ - private void standardize(Analyzer analyzer) { - FunctionName analyticFnName = getFnCall().getFnName(); - - // 1. Set a window from UNBOUNDED PRECEDING to CURRENT_ROW for row_number(). - if (analyticFnName.getFunction().equals(ROWNUMBER)) { - Preconditions.checkState(window_ == null, "Unexpected window set for row_numer()"); - window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS, - new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null), - new Boundary(BoundaryType.CURRENT_ROW, null)); - resetWindow_ = true; - return; - } - - // 2. Explicitly set the default arguments to lead()/lag() for BE simplicity. - // Set a window for lead(): UNBOUNDED PRECEDING to OFFSET FOLLOWING, - // Set a window for lag(): UNBOUNDED PRECEDING to OFFSET PRECEDING. - if (isOffsetFn(getFnCall().getFn())) { - Preconditions.checkState(window_ == null); - - // If necessary, create a new fn call with the default args explicitly set. - List<Expr> newExprParams = null; - if (getFnCall().getChildren().size() == 1) { - newExprParams = Lists.newArrayListWithExpectedSize(3); - newExprParams.addAll(getFnCall().getChildren()); - // Default offset is 1. - newExprParams.add(new NumericLiteral(BigDecimal.valueOf(1))); - // Default default value is NULL. - newExprParams.add(new NullLiteral()); - } else if (getFnCall().getChildren().size() == 2) { - newExprParams = Lists.newArrayListWithExpectedSize(3); - newExprParams.addAll(getFnCall().getChildren()); - // Default default value is NULL. - newExprParams.add(new NullLiteral()); - } else { - Preconditions.checkState(getFnCall().getChildren().size() == 3); - } - if (newExprParams != null) { - fnCall_ = new FunctionCallExpr(getFnCall().getFnName(), - new FunctionParams(newExprParams)); - fnCall_.setIsAnalyticFnCall(true); - fnCall_.analyzeNoThrow(analyzer); - } - - // Set the window. - BoundaryType rightBoundaryType = BoundaryType.FOLLOWING; - if (analyticFnName.getFunction().equals(LAG)) { - rightBoundaryType = BoundaryType.PRECEDING; - } - window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS, - new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null), - new Boundary(rightBoundaryType, getOffsetExpr(getFnCall()))); - try { - window_.analyze(analyzer); - } catch (AnalysisException e) { - throw new IllegalStateException(e); - } - resetWindow_ = true; - return; - } - - // 3. - if (analyticFnName.getFunction().equals(FIRST_VALUE) - && window_ != null - && window_.getLeftBoundary().getType() != BoundaryType.UNBOUNDED_PRECEDING - && !getFnCall().getParams().isIgnoreNulls()) { - if (window_.getLeftBoundary().getType() != BoundaryType.PRECEDING) { - window_ = new AnalyticWindow(window_.getType(), window_.getLeftBoundary(), - window_.getLeftBoundary()); - fnCall_ = new FunctionCallExpr(new FunctionName(LAST_VALUE), - getFnCall().getParams()); - } else { - List<Expr> paramExprs = Expr.cloneList(getFnCall().getParams().exprs()); - if (window_.getRightBoundary().getType() == BoundaryType.PRECEDING) { - // The number of rows preceding for the end bound determines the number of - // rows at the beginning of each partition that should have a NULL value. - paramExprs.add(new NumericLiteral(window_.getRightBoundary().getOffsetValue(), - Type.BIGINT)); - } else { - // -1 indicates that no NULL values are inserted even though we set the end - // bound to the start bound (which is PRECEDING) below; this is different from - // the default behavior of windows with an end bound PRECEDING. - paramExprs.add(new NumericLiteral(BigInteger.valueOf(-1), Type.BIGINT)); - } - - window_ = new AnalyticWindow(window_.getType(), - new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null), - window_.getLeftBoundary()); - fnCall_ = new FunctionCallExpr(new FunctionName(FIRST_VALUE_REWRITE), - new FunctionParams(paramExprs)); - fnCall_.setIsInternalFnCall(true); - } - fnCall_.setIsAnalyticFnCall(true); - fnCall_.analyzeNoThrow(analyzer); - // Use getType() instead if getReturnType() because wildcard decimals - // have only been resolved in the former. - type_ = fnCall_.getType(); - analyticFnName = getFnCall().getFnName(); - } - - // 4. Reverse the ordering and window for windows not starting with UNBOUNDED - // PRECEDING and either: ending with UNBOUNDED FOLLOWING or - // first_value(... ignore nulls) - if (window_ != null - && window_.getLeftBoundary().getType() != BoundaryType.UNBOUNDED_PRECEDING - && (window_.getRightBoundary().getType() == BoundaryType.UNBOUNDED_FOLLOWING - || (analyticFnName.getFunction().equals(FIRST_VALUE) - && getFnCall().getParams().isIgnoreNulls()))) { - orderByElements_ = OrderByElement.reverse(orderByElements_); - window_ = window_.reverse(); - - // Also flip first_value()/last_value(). For other analytic functions there is no - // need to also change the function. - FunctionName reversedFnName = null; - if (analyticFnName.getFunction().equals(FIRST_VALUE)) { - reversedFnName = new FunctionName(LAST_VALUE); - } else if (analyticFnName.getFunction().equals(LAST_VALUE)) { - reversedFnName = new FunctionName(FIRST_VALUE); - } - if (reversedFnName != null) { - fnCall_ = new FunctionCallExpr(reversedFnName, getFnCall().getParams()); - fnCall_.setIsAnalyticFnCall(true); - fnCall_.analyzeNoThrow(analyzer); - } - analyticFnName = getFnCall().getFnName(); - } - - // 5. Set the start boundary to CURRENT_ROW for first_value() if the end boundary - // is UNBOUNDED_PRECEDING and IGNORE NULLS is not set. - if (analyticFnName.getFunction().equals(FIRST_VALUE) - && window_ != null - && window_.getLeftBoundary().getType() == BoundaryType.UNBOUNDED_PRECEDING - && window_.getRightBoundary().getType() != BoundaryType.PRECEDING - && !getFnCall().getParams().isIgnoreNulls()) { - window_.setRightBoundary(new Boundary(BoundaryType.CURRENT_ROW, null)); - } - - // 6. Set the default window. - if (!orderByElements_.isEmpty() && window_ == null) { - window_ = AnalyticWindow.DEFAULT_WINDOW; - resetWindow_ = true; - } - - // 7. Change first_value/last_value RANGE windows to ROWS. - if ((analyticFnName.getFunction().equals(FIRST_VALUE) - || analyticFnName.getFunction().equals(LAST_VALUE)) - && window_ != null - && window_.getType() == AnalyticWindow.Type.RANGE) { - window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS, window_.getLeftBoundary(), - window_.getRightBoundary()); - } - - // 8. Append IGNORE NULLS to fn name if set. - if (getFnCall().getParams().isIgnoreNulls()) { - if (analyticFnName.getFunction().equals(LAST_VALUE)) { - fnCall_ = new FunctionCallExpr(new FunctionName(LAST_VALUE_IGNORE_NULLS), - getFnCall().getParams()); - } else { - Preconditions.checkState(analyticFnName.getFunction().equals(FIRST_VALUE)); - fnCall_ = new FunctionCallExpr(new FunctionName(FIRST_VALUE_IGNORE_NULLS), - getFnCall().getParams()); - } - - fnCall_.setIsAnalyticFnCall(true); - fnCall_.setIsInternalFnCall(true); - fnCall_.analyzeNoThrow(analyzer); - analyticFnName = getFnCall().getFnName(); - Preconditions.checkState(type_.equals(fnCall_.getType())); - } - } - - /** - * Returns the explicit or implicit offset of an analytic function call. - */ - private Expr getOffsetExpr(FunctionCallExpr offsetFnCall) { - Preconditions.checkState(isOffsetFn(getFnCall().getFn())); - if (offsetFnCall.getChild(1) != null) return offsetFnCall.getChild(1); - // The default offset is 1. - return new NumericLiteral(BigDecimal.valueOf(1)); - } - - /** - * Keep fnCall_, partitionExprs_ and orderByElements_ in sync with children_. - */ - private void syncWithChildren() { - int numArgs = fnCall_.getChildren().size(); - for (int i = 0; i < numArgs; ++i) { - fnCall_.setChild(i, getChild(i)); - } - int numPartitionExprs = partitionExprs_.size(); - for (int i = 0; i < numPartitionExprs; ++i) { - partitionExprs_.set(i, getChild(numArgs + i)); - } - for (int i = 0; i < orderByElements_.size(); ++i) { - orderByElements_.get(i).setExpr(getChild(numArgs + numPartitionExprs + i)); - } - } - - /** - * Populate children_ from fnCall_, partitionExprs_, orderByElements_ - */ - private void setChildren() { - getChildren().clear(); - addChildren(fnCall_.getChildren()); - addChildren(partitionExprs_); - for (OrderByElement e: orderByElements_) { - addChild(e.getExpr()); - } - if (window_ != null) { - if (window_.getLeftBoundary().getExpr() != null) { - addChild(window_.getLeftBoundary().getExpr()); - } - if (window_.getRightBoundary() != null - && window_.getRightBoundary().getExpr() != null) { - addChild(window_.getRightBoundary().getExpr()); - } - } - } - - @Override - protected void resetAnalysisState() { - super.resetAnalysisState(); - fnCall_.resetAnalysisState(); - if (resetWindow_) window_ = null; - resetWindow_ = false; - // sync with children, now that they've been reset - syncWithChildren(); - } - - @Override - protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) - throws AnalysisException { - Expr e = super.substituteImpl(smap, analyzer); - if (!(e instanceof AnalyticExpr)) return e; - // Re-sync state after possible child substitution. - ((AnalyticExpr) e).syncWithChildren(); - return e; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java deleted file mode 100644 index d0d1a85..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java +++ /dev/null @@ -1,199 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.catalog.Type; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Encapsulates the analytic functions found in a single select block plus - * the corresponding analytic result tuple and its substitution map. - */ -public class AnalyticInfo extends AggregateInfoBase { - private final static Logger LOG = LoggerFactory.getLogger(AnalyticInfo.class); - - // All unique analytic exprs of a select block. Used to populate - // super.aggregateExprs_ based on AnalyticExpr.getFnCall() for each analytic expr - // in this list. - private final ArrayList<Expr> analyticExprs_; - - // Intersection of the partition exps of all the analytic functions. - private final List<Expr> commonPartitionExprs_; - - // map from analyticExprs_ to their corresponding analytic tuple slotrefs - private final ExprSubstitutionMap analyticTupleSmap_; - - private AnalyticInfo(ArrayList<Expr> analyticExprs) { - super(new ArrayList<Expr>(), new ArrayList<FunctionCallExpr>()); - analyticExprs_ = Expr.cloneList(analyticExprs); - // Extract the analytic function calls for each analytic expr. - for (Expr analyticExpr: analyticExprs) { - aggregateExprs_.add(((AnalyticExpr) analyticExpr).getFnCall()); - } - analyticTupleSmap_ = new ExprSubstitutionMap(); - commonPartitionExprs_ = computeCommonPartitionExprs(); - } - - /** - * C'tor for cloning. - */ - private AnalyticInfo(AnalyticInfo other) { - super(other); - analyticExprs_ = - (other.analyticExprs_ != null) ? Expr.cloneList(other.analyticExprs_) : null; - analyticTupleSmap_ = other.analyticTupleSmap_.clone(); - commonPartitionExprs_ = Expr.cloneList(other.commonPartitionExprs_); - } - - public ArrayList<Expr> getAnalyticExprs() { return analyticExprs_; } - public ExprSubstitutionMap getSmap() { return analyticTupleSmap_; } - public List<Expr> getCommonPartitionExprs() { return commonPartitionExprs_; } - - /** - * Creates complete AnalyticInfo for analyticExprs, including tuple descriptors and - * smaps. - */ - static public AnalyticInfo create( - ArrayList<Expr> analyticExprs, Analyzer analyzer) { - Preconditions.checkState(analyticExprs != null && !analyticExprs.isEmpty()); - Expr.removeDuplicates(analyticExprs); - AnalyticInfo result = new AnalyticInfo(analyticExprs); - result.createTupleDescs(analyzer); - - // The tuple descriptors are logical. Their slots are remapped to physical tuples - // during plan generation. - result.outputTupleDesc_.setIsMaterialized(false); - result.intermediateTupleDesc_.setIsMaterialized(false); - - // Populate analyticTupleSmap_ - Preconditions.checkState(analyticExprs.size() == - result.outputTupleDesc_.getSlots().size()); - for (int i = 0; i < analyticExprs.size(); ++i) { - result.analyticTupleSmap_.put(result.analyticExprs_.get(i), - new SlotRef(result.outputTupleDesc_.getSlots().get(i))); - result.outputTupleDesc_.getSlots().get(i).setSourceExpr( - result.analyticExprs_.get(i)); - } - LOG.trace("analytictuple=" + result.outputTupleDesc_.debugString()); - LOG.trace("analytictuplesmap=" + result.analyticTupleSmap_.debugString()); - LOG.trace("analytic info:\n" + result.debugString()); - return result; - } - - /** - * Returns the intersection of the partition exprs of all the - * analytic functions. - */ - private List<Expr> computeCommonPartitionExprs() { - List<Expr> result = Lists.newArrayList(); - for (Expr analyticExpr: analyticExprs_) { - Preconditions.checkState(analyticExpr.isAnalyzed_); - List<Expr> partitionExprs = ((AnalyticExpr) analyticExpr).getPartitionExprs(); - if (partitionExprs == null) continue; - if (result.isEmpty()) { - result.addAll(partitionExprs); - } else { - result.retainAll(partitionExprs); - if (result.isEmpty()) break; - } - } - return result; - } - - /** - * Append ids of all slots that are being referenced in the process - * of performing the analytic computation described by this AnalyticInfo. - */ - public void getRefdSlots(List<SlotId> ids) { - Preconditions.checkState(intermediateTupleDesc_ != null); - Expr.getIds(analyticExprs_, null, ids); - // The backend assumes that the entire intermediateTupleDesc is materialized - for (SlotDescriptor slotDesc: intermediateTupleDesc_.getSlots()) { - ids.add(slotDesc.getId()); - } - } - - @Override - public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap) { - materializedSlots_.clear(); - List<Expr> exprs = Lists.newArrayList(); - for (int i = 0; i < analyticExprs_.size(); ++i) { - SlotDescriptor outputSlotDesc = outputTupleDesc_.getSlots().get(i); - if (!outputSlotDesc.isMaterialized()) continue; - intermediateTupleDesc_.getSlots().get(i).setIsMaterialized(true); - exprs.add(analyticExprs_.get(i)); - materializedSlots_.add(i); - } - List<Expr> resolvedExprs = Expr.substituteList(exprs, smap, analyzer, false); - analyzer.materializeSlots(resolvedExprs); - } - - /** - * Validates internal state: Checks that the number of materialized slots of the - * analytic tuple corresponds to the number of materialized analytic functions. Also - * checks that the return types of the analytic exprs correspond to the slots in the - * analytic tuple. - */ - public void checkConsistency() { - ArrayList<SlotDescriptor> slots = intermediateTupleDesc_.getSlots(); - - // Check materialized slots. - int numMaterializedSlots = 0; - for (SlotDescriptor slotDesc: slots) { - if (slotDesc.isMaterialized()) ++numMaterializedSlots; - } - Preconditions.checkState(numMaterializedSlots == - materializedSlots_.size()); - - // Check that analytic expr return types match the slot descriptors. - int slotIdx = 0; - for (int i = 0; i < analyticExprs_.size(); ++i) { - Expr analyticExpr = analyticExprs_.get(i); - Type slotType = slots.get(slotIdx).getType(); - Preconditions.checkState(analyticExpr.getType().equals(slotType), - String.format("Analytic expr %s returns type %s but its analytic tuple " + - "slot has type %s", analyticExpr.toSql(), - analyticExpr.getType().toString(), slotType.toString())); - ++slotIdx; - } - } - - @Override - public String debugString() { - StringBuilder out = new StringBuilder(super.debugString()); - out.append(Objects.toStringHelper(this) - .add("analytic_exprs", Expr.debugString(analyticExprs_)) - .add("smap", analyticTupleSmap_.debugString()) - .toString()); - return out.toString(); - } - - @Override - protected String tupleDebugName() { return "analytic-tuple"; } - - @Override - public AnalyticInfo clone() { return new AnalyticInfo(this); } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java deleted file mode 100644 index 68558da..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java +++ /dev/null @@ -1,417 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.math.BigDecimal; - -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.service.FeSupport; -import com.cloudera.impala.thrift.TAnalyticWindow; -import com.cloudera.impala.thrift.TAnalyticWindowBoundary; -import com.cloudera.impala.thrift.TAnalyticWindowBoundaryType; -import com.cloudera.impala.thrift.TAnalyticWindowType; -import com.cloudera.impala.thrift.TColumnValue; -import com.cloudera.impala.util.TColumnValueUtil; -import com.google.common.base.Preconditions; - - -/** - * Windowing clause of an analytic expr - * Both left and right boundaries are always non-null after analyze(). - */ -public class AnalyticWindow { - // default window used when an analytic expr was given an order by but no window - public static final AnalyticWindow DEFAULT_WINDOW = new AnalyticWindow(Type.RANGE, - new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null), - new Boundary(BoundaryType.CURRENT_ROW, null)); - - enum Type { - ROWS("ROWS"), - RANGE("RANGE"); - - private final String description_; - - private Type(String d) { - description_ = d; - } - - @Override - public String toString() { return description_; } - public TAnalyticWindowType toThrift() { - return this == ROWS ? TAnalyticWindowType.ROWS : TAnalyticWindowType.RANGE; - } - } - - enum BoundaryType { - UNBOUNDED_PRECEDING("UNBOUNDED PRECEDING"), - UNBOUNDED_FOLLOWING("UNBOUNDED FOLLOWING"), - CURRENT_ROW("CURRENT ROW"), - PRECEDING("PRECEDING"), - FOLLOWING("FOLLOWING"); - - private final String description_; - - private BoundaryType(String d) { - description_ = d; - } - - @Override - public String toString() { return description_; } - public TAnalyticWindowBoundaryType toThrift() { - Preconditions.checkState(!isAbsolutePos()); - if (this == CURRENT_ROW) { - return TAnalyticWindowBoundaryType.CURRENT_ROW; - } else if (this == PRECEDING) { - return TAnalyticWindowBoundaryType.PRECEDING; - } else if (this == FOLLOWING) { - return TAnalyticWindowBoundaryType.FOLLOWING; - } - return null; - } - - public boolean isAbsolutePos() { - return this == UNBOUNDED_PRECEDING || this == UNBOUNDED_FOLLOWING; - } - - public boolean isOffset() { - return this == PRECEDING || this == FOLLOWING; - } - - public boolean isPreceding() { - return this == UNBOUNDED_PRECEDING || this == PRECEDING; - } - - public boolean isFollowing() { - return this == UNBOUNDED_FOLLOWING || this == FOLLOWING; - } - - public BoundaryType converse() { - switch (this) { - case UNBOUNDED_PRECEDING: return UNBOUNDED_FOLLOWING; - case UNBOUNDED_FOLLOWING: return UNBOUNDED_PRECEDING; - case PRECEDING: return FOLLOWING; - case FOLLOWING: return PRECEDING; - default: return CURRENT_ROW; - } - } - } - - public static class Boundary { - private final BoundaryType type_; - - // Offset expr. Only set for PRECEDING/FOLLOWING. Needed for toSql(). - private final Expr expr_; - - // The offset value. Set during analysis after evaluating expr_. Integral valued - // for ROWS windows. - private BigDecimal offsetValue_; - - public BoundaryType getType() { return type_; } - public Expr getExpr() { return expr_; } - public BigDecimal getOffsetValue() { return offsetValue_; } - - public Boundary(BoundaryType type, Expr e) { - this(type, e, null); - } - - // c'tor used by clone() - private Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) { - Preconditions.checkState( - (type.isOffset() && e != null) - || (!type.isOffset() && e == null)); - type_ = type; - expr_ = e; - offsetValue_ = offsetValue; - } - - public String toSql() { - StringBuilder sb = new StringBuilder(); - if (expr_ != null) sb.append(expr_.toSql()).append(" "); - sb.append(type_.toString()); - return sb.toString(); - } - - public TAnalyticWindowBoundary toThrift(Type windowType) { - TAnalyticWindowBoundary result = new TAnalyticWindowBoundary(type_.toThrift()); - if (type_.isOffset() && windowType == Type.ROWS) { - result.setRows_offset_value(offsetValue_.longValue()); - } - // TODO: range windows need range_offset_predicate - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) return false; - if (obj.getClass() != this.getClass()) return false; - Boundary o = (Boundary)obj; - boolean exprEqual = (expr_ == null) == (o.expr_ == null); - if (exprEqual && expr_ != null) exprEqual = expr_.equals(o.expr_); - return type_ == o.type_ && exprEqual; - } - - public Boundary converse() { - Boundary result = new Boundary(type_.converse(), - (expr_ != null) ? expr_.clone() : null); - result.offsetValue_ = offsetValue_; - return result; - } - - @Override - public Boundary clone() { - return new Boundary(type_, expr_ != null ? expr_.clone() : null, offsetValue_); - } - - public void analyze(Analyzer analyzer) throws AnalysisException { - if (expr_ != null) expr_.analyze(analyzer); - } - } - - private final Type type_; - private final Boundary leftBoundary_; - private Boundary rightBoundary_; // may be null before analyze() - private String toSqlString_; // cached after analysis - - public Type getType() { return type_; } - public Boundary getLeftBoundary() { return leftBoundary_; } - public Boundary getRightBoundary() { return rightBoundary_; } - public Boundary setRightBoundary(Boundary b) { return rightBoundary_ = b; } - - public AnalyticWindow(Type type, Boundary b) { - type_ = type; - Preconditions.checkNotNull(b); - leftBoundary_ = b; - rightBoundary_ = null; - } - - public AnalyticWindow(Type type, Boundary l, Boundary r) { - type_ = type; - Preconditions.checkNotNull(l); - leftBoundary_ = l; - Preconditions.checkNotNull(r); - rightBoundary_ = r; - } - - /** - * Clone c'tor - */ - private AnalyticWindow(AnalyticWindow other) { - type_ = other.type_; - Preconditions.checkNotNull(other.leftBoundary_); - leftBoundary_ = other.leftBoundary_.clone(); - if (other.rightBoundary_ != null) { - rightBoundary_ = other.rightBoundary_.clone(); - } - toSqlString_ = other.toSqlString_; // safe to share - } - - public AnalyticWindow reverse() { - Boundary newRightBoundary = leftBoundary_.converse(); - Boundary newLeftBoundary = null; - if (rightBoundary_ == null) { - newLeftBoundary = new Boundary(leftBoundary_.getType(), null); - } else { - newLeftBoundary = rightBoundary_.converse(); - } - return new AnalyticWindow(type_, newLeftBoundary, newRightBoundary); - } - - public String toSql() { - if (toSqlString_ != null) return toSqlString_; - StringBuilder sb = new StringBuilder(); - sb.append(type_.toString()).append(" "); - if (rightBoundary_ == null) { - sb.append(leftBoundary_.toSql()); - } else { - sb.append("BETWEEN ").append(leftBoundary_.toSql()).append(" AND "); - sb.append(rightBoundary_.toSql()); - } - return sb.toString(); - } - - public TAnalyticWindow toThrift() { - TAnalyticWindow result = new TAnalyticWindow(type_.toThrift()); - if (leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) { - result.setWindow_start(leftBoundary_.toThrift(type_)); - } - Preconditions.checkNotNull(rightBoundary_); - if (rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) { - result.setWindow_end(rightBoundary_.toThrift(type_)); - } - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) return false; - if (obj.getClass() != this.getClass()) return false; - AnalyticWindow o = (AnalyticWindow)obj; - boolean rightBoundaryEqual = - (rightBoundary_ == null) == (o.rightBoundary_ == null); - if (rightBoundaryEqual && rightBoundary_ != null) { - rightBoundaryEqual = rightBoundary_.equals(o.rightBoundary_); - } - return type_ == o.type_ - && leftBoundary_.equals(o.leftBoundary_) - && rightBoundaryEqual; - } - - @Override - public AnalyticWindow clone() { return new AnalyticWindow(this); } - - /** - * Semantic analysis for expr of a PRECEDING/FOLLOWING clause. - */ - private void checkOffsetExpr(Analyzer analyzer, Boundary boundary) - throws AnalysisException { - Preconditions.checkState(boundary.getType().isOffset()); - Expr e = boundary.getExpr(); - Preconditions.checkNotNull(e); - boolean isPos = true; - Double val = null; - if (e.isConstant() && e.getType().isNumericType()) { - try { - val = TColumnValueUtil.getNumericVal( - FeSupport.EvalConstExpr(e, analyzer.getQueryCtx())); - if (val <= 0) isPos = false; - } catch (InternalException exc) { - throw new AnalysisException( - "Couldn't evaluate PRECEDING/FOLLOWING expression: " + exc.getMessage()); - } - } - - if (type_ == Type.ROWS) { - if (!e.isConstant() || !e.getType().isIntegerType() || !isPos) { - throw new AnalysisException( - "For ROWS window, the value of a PRECEDING/FOLLOWING offset must be a " - + "constant positive integer: " + boundary.toSql()); - } - Preconditions.checkNotNull(val); - boundary.offsetValue_ = new BigDecimal(val.longValue()); - } else { - if (!e.isConstant() || !e.getType().isNumericType() || !isPos) { - throw new AnalysisException( - "For RANGE window, the value of a PRECEDING/FOLLOWING offset must be a " - + "constant positive number: " + boundary.toSql()); - } - boundary.offsetValue_ = new BigDecimal(val); - } - } - - /** - * Check that b1 <= b2. - */ - private void checkOffsetBoundaries(Analyzer analyzer, Boundary b1, Boundary b2) - throws AnalysisException { - Preconditions.checkState(b1.getType().isOffset()); - Preconditions.checkState(b2.getType().isOffset()); - Expr e1 = b1.getExpr(); - Preconditions.checkState( - e1 != null && e1.isConstant() && e1.getType().isNumericType()); - Expr e2 = b2.getExpr(); - Preconditions.checkState( - e2 != null && e2.isConstant() && e2.getType().isNumericType()); - - try { - TColumnValue val1 = FeSupport.EvalConstExpr(e1, analyzer.getQueryCtx()); - TColumnValue val2 = FeSupport.EvalConstExpr(e2, analyzer.getQueryCtx()); - double left = TColumnValueUtil.getNumericVal(val1); - double right = TColumnValueUtil.getNumericVal(val2); - if (left > right) { - throw new AnalysisException( - "Offset boundaries are in the wrong order: " + toSql()); - } - } catch (InternalException exc) { - throw new AnalysisException( - "Couldn't evaluate PRECEDING/FOLLOWING expression: " + exc.getMessage()); - } - - } - - public void analyze(Analyzer analyzer) throws AnalysisException { - leftBoundary_.analyze(analyzer); - if (rightBoundary_ != null) rightBoundary_.analyze(analyzer); - - if (leftBoundary_.getType() == BoundaryType.UNBOUNDED_FOLLOWING) { - throw new AnalysisException( - leftBoundary_.getType().toString() + " is only allowed for upper bound of " - + "BETWEEN"); - } - if (rightBoundary_ != null - && rightBoundary_.getType() == BoundaryType.UNBOUNDED_PRECEDING) { - throw new AnalysisException( - rightBoundary_.getType().toString() + " is only allowed for lower bound of " - + "BETWEEN"); - } - - // TODO: Remove when RANGE windows with offset boundaries are supported. - if (type_ == Type.RANGE) { - if (leftBoundary_.type_.isOffset() - || (rightBoundary_ != null && rightBoundary_.type_.isOffset()) - || (leftBoundary_.type_ == BoundaryType.CURRENT_ROW - && (rightBoundary_ == null - || rightBoundary_.type_ == BoundaryType.CURRENT_ROW))) { - throw new AnalysisException( - "RANGE is only supported with both the lower and upper bounds UNBOUNDED or" - + " one UNBOUNDED and the other CURRENT ROW."); - } - } - - if (rightBoundary_ == null && leftBoundary_.getType() == BoundaryType.FOLLOWING) { - throw new AnalysisException( - leftBoundary_.getType().toString() + " requires a BETWEEN clause"); - } - - if (leftBoundary_.getType().isOffset()) checkOffsetExpr(analyzer, leftBoundary_); - if (rightBoundary_ == null) { - // set right boundary to implied value, but make sure to cache toSql string - // beforehand - toSqlString_ = toSql(); - rightBoundary_ = new Boundary(BoundaryType.CURRENT_ROW, null); - return; - } - if (rightBoundary_.getType().isOffset()) checkOffsetExpr(analyzer, rightBoundary_); - - if (leftBoundary_.getType() == BoundaryType.FOLLOWING) { - if (rightBoundary_.getType() != BoundaryType.FOLLOWING - && rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) { - throw new AnalysisException( - "A lower window bound of " + BoundaryType.FOLLOWING.toString() - + " requires that the upper bound also be " - + BoundaryType.FOLLOWING.toString()); - } - if (rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) { - checkOffsetBoundaries(analyzer, leftBoundary_, rightBoundary_); - } - } - - if (rightBoundary_.getType() == BoundaryType.PRECEDING) { - if (leftBoundary_.getType() != BoundaryType.PRECEDING - && leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) { - throw new AnalysisException( - "An upper window bound of " + BoundaryType.PRECEDING.toString() - + " requires that the lower bound also be " - + BoundaryType.PRECEDING.toString()); - } - if (leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) { - checkOffsetBoundaries(analyzer, rightBoundary_, leftBoundary_); - } - } - } -}
