http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java new file mode 100644 index 0000000..d0d1a85 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudera.impala.catalog.Type; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Encapsulates the analytic functions found in a single select block plus + * the corresponding analytic result tuple and its substitution map. + */ +public class AnalyticInfo extends AggregateInfoBase { + private final static Logger LOG = LoggerFactory.getLogger(AnalyticInfo.class); + + // All unique analytic exprs of a select block. Used to populate + // super.aggregateExprs_ based on AnalyticExpr.getFnCall() for each analytic expr + // in this list. + private final ArrayList<Expr> analyticExprs_; + + // Intersection of the partition exps of all the analytic functions. + private final List<Expr> commonPartitionExprs_; + + // map from analyticExprs_ to their corresponding analytic tuple slotrefs + private final ExprSubstitutionMap analyticTupleSmap_; + + private AnalyticInfo(ArrayList<Expr> analyticExprs) { + super(new ArrayList<Expr>(), new ArrayList<FunctionCallExpr>()); + analyticExprs_ = Expr.cloneList(analyticExprs); + // Extract the analytic function calls for each analytic expr. + for (Expr analyticExpr: analyticExprs) { + aggregateExprs_.add(((AnalyticExpr) analyticExpr).getFnCall()); + } + analyticTupleSmap_ = new ExprSubstitutionMap(); + commonPartitionExprs_ = computeCommonPartitionExprs(); + } + + /** + * C'tor for cloning. + */ + private AnalyticInfo(AnalyticInfo other) { + super(other); + analyticExprs_ = + (other.analyticExprs_ != null) ? Expr.cloneList(other.analyticExprs_) : null; + analyticTupleSmap_ = other.analyticTupleSmap_.clone(); + commonPartitionExprs_ = Expr.cloneList(other.commonPartitionExprs_); + } + + public ArrayList<Expr> getAnalyticExprs() { return analyticExprs_; } + public ExprSubstitutionMap getSmap() { return analyticTupleSmap_; } + public List<Expr> getCommonPartitionExprs() { return commonPartitionExprs_; } + + /** + * Creates complete AnalyticInfo for analyticExprs, including tuple descriptors and + * smaps. + */ + static public AnalyticInfo create( + ArrayList<Expr> analyticExprs, Analyzer analyzer) { + Preconditions.checkState(analyticExprs != null && !analyticExprs.isEmpty()); + Expr.removeDuplicates(analyticExprs); + AnalyticInfo result = new AnalyticInfo(analyticExprs); + result.createTupleDescs(analyzer); + + // The tuple descriptors are logical. Their slots are remapped to physical tuples + // during plan generation. + result.outputTupleDesc_.setIsMaterialized(false); + result.intermediateTupleDesc_.setIsMaterialized(false); + + // Populate analyticTupleSmap_ + Preconditions.checkState(analyticExprs.size() == + result.outputTupleDesc_.getSlots().size()); + for (int i = 0; i < analyticExprs.size(); ++i) { + result.analyticTupleSmap_.put(result.analyticExprs_.get(i), + new SlotRef(result.outputTupleDesc_.getSlots().get(i))); + result.outputTupleDesc_.getSlots().get(i).setSourceExpr( + result.analyticExprs_.get(i)); + } + LOG.trace("analytictuple=" + result.outputTupleDesc_.debugString()); + LOG.trace("analytictuplesmap=" + result.analyticTupleSmap_.debugString()); + LOG.trace("analytic info:\n" + result.debugString()); + return result; + } + + /** + * Returns the intersection of the partition exprs of all the + * analytic functions. + */ + private List<Expr> computeCommonPartitionExprs() { + List<Expr> result = Lists.newArrayList(); + for (Expr analyticExpr: analyticExprs_) { + Preconditions.checkState(analyticExpr.isAnalyzed_); + List<Expr> partitionExprs = ((AnalyticExpr) analyticExpr).getPartitionExprs(); + if (partitionExprs == null) continue; + if (result.isEmpty()) { + result.addAll(partitionExprs); + } else { + result.retainAll(partitionExprs); + if (result.isEmpty()) break; + } + } + return result; + } + + /** + * Append ids of all slots that are being referenced in the process + * of performing the analytic computation described by this AnalyticInfo. + */ + public void getRefdSlots(List<SlotId> ids) { + Preconditions.checkState(intermediateTupleDesc_ != null); + Expr.getIds(analyticExprs_, null, ids); + // The backend assumes that the entire intermediateTupleDesc is materialized + for (SlotDescriptor slotDesc: intermediateTupleDesc_.getSlots()) { + ids.add(slotDesc.getId()); + } + } + + @Override + public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap) { + materializedSlots_.clear(); + List<Expr> exprs = Lists.newArrayList(); + for (int i = 0; i < analyticExprs_.size(); ++i) { + SlotDescriptor outputSlotDesc = outputTupleDesc_.getSlots().get(i); + if (!outputSlotDesc.isMaterialized()) continue; + intermediateTupleDesc_.getSlots().get(i).setIsMaterialized(true); + exprs.add(analyticExprs_.get(i)); + materializedSlots_.add(i); + } + List<Expr> resolvedExprs = Expr.substituteList(exprs, smap, analyzer, false); + analyzer.materializeSlots(resolvedExprs); + } + + /** + * Validates internal state: Checks that the number of materialized slots of the + * analytic tuple corresponds to the number of materialized analytic functions. Also + * checks that the return types of the analytic exprs correspond to the slots in the + * analytic tuple. + */ + public void checkConsistency() { + ArrayList<SlotDescriptor> slots = intermediateTupleDesc_.getSlots(); + + // Check materialized slots. + int numMaterializedSlots = 0; + for (SlotDescriptor slotDesc: slots) { + if (slotDesc.isMaterialized()) ++numMaterializedSlots; + } + Preconditions.checkState(numMaterializedSlots == + materializedSlots_.size()); + + // Check that analytic expr return types match the slot descriptors. + int slotIdx = 0; + for (int i = 0; i < analyticExprs_.size(); ++i) { + Expr analyticExpr = analyticExprs_.get(i); + Type slotType = slots.get(slotIdx).getType(); + Preconditions.checkState(analyticExpr.getType().equals(slotType), + String.format("Analytic expr %s returns type %s but its analytic tuple " + + "slot has type %s", analyticExpr.toSql(), + analyticExpr.getType().toString(), slotType.toString())); + ++slotIdx; + } + } + + @Override + public String debugString() { + StringBuilder out = new StringBuilder(super.debugString()); + out.append(Objects.toStringHelper(this) + .add("analytic_exprs", Expr.debugString(analyticExprs_)) + .add("smap", analyticTupleSmap_.debugString()) + .toString()); + return out.toString(); + } + + @Override + protected String tupleDebugName() { return "analytic-tuple"; } + + @Override + public AnalyticInfo clone() { return new AnalyticInfo(this); } +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java new file mode 100644 index 0000000..68558da --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.math.BigDecimal; + +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.service.FeSupport; +import com.cloudera.impala.thrift.TAnalyticWindow; +import com.cloudera.impala.thrift.TAnalyticWindowBoundary; +import com.cloudera.impala.thrift.TAnalyticWindowBoundaryType; +import com.cloudera.impala.thrift.TAnalyticWindowType; +import com.cloudera.impala.thrift.TColumnValue; +import com.cloudera.impala.util.TColumnValueUtil; +import com.google.common.base.Preconditions; + + +/** + * Windowing clause of an analytic expr + * Both left and right boundaries are always non-null after analyze(). + */ +public class AnalyticWindow { + // default window used when an analytic expr was given an order by but no window + public static final AnalyticWindow DEFAULT_WINDOW = new AnalyticWindow(Type.RANGE, + new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null), + new Boundary(BoundaryType.CURRENT_ROW, null)); + + enum Type { + ROWS("ROWS"), + RANGE("RANGE"); + + private final String description_; + + private Type(String d) { + description_ = d; + } + + @Override + public String toString() { return description_; } + public TAnalyticWindowType toThrift() { + return this == ROWS ? TAnalyticWindowType.ROWS : TAnalyticWindowType.RANGE; + } + } + + enum BoundaryType { + UNBOUNDED_PRECEDING("UNBOUNDED PRECEDING"), + UNBOUNDED_FOLLOWING("UNBOUNDED FOLLOWING"), + CURRENT_ROW("CURRENT ROW"), + PRECEDING("PRECEDING"), + FOLLOWING("FOLLOWING"); + + private final String description_; + + private BoundaryType(String d) { + description_ = d; + } + + @Override + public String toString() { return description_; } + public TAnalyticWindowBoundaryType toThrift() { + Preconditions.checkState(!isAbsolutePos()); + if (this == CURRENT_ROW) { + return TAnalyticWindowBoundaryType.CURRENT_ROW; + } else if (this == PRECEDING) { + return TAnalyticWindowBoundaryType.PRECEDING; + } else if (this == FOLLOWING) { + return TAnalyticWindowBoundaryType.FOLLOWING; + } + return null; + } + + public boolean isAbsolutePos() { + return this == UNBOUNDED_PRECEDING || this == UNBOUNDED_FOLLOWING; + } + + public boolean isOffset() { + return this == PRECEDING || this == FOLLOWING; + } + + public boolean isPreceding() { + return this == UNBOUNDED_PRECEDING || this == PRECEDING; + } + + public boolean isFollowing() { + return this == UNBOUNDED_FOLLOWING || this == FOLLOWING; + } + + public BoundaryType converse() { + switch (this) { + case UNBOUNDED_PRECEDING: return UNBOUNDED_FOLLOWING; + case UNBOUNDED_FOLLOWING: return UNBOUNDED_PRECEDING; + case PRECEDING: return FOLLOWING; + case FOLLOWING: return PRECEDING; + default: return CURRENT_ROW; + } + } + } + + public static class Boundary { + private final BoundaryType type_; + + // Offset expr. Only set for PRECEDING/FOLLOWING. Needed for toSql(). + private final Expr expr_; + + // The offset value. Set during analysis after evaluating expr_. Integral valued + // for ROWS windows. + private BigDecimal offsetValue_; + + public BoundaryType getType() { return type_; } + public Expr getExpr() { return expr_; } + public BigDecimal getOffsetValue() { return offsetValue_; } + + public Boundary(BoundaryType type, Expr e) { + this(type, e, null); + } + + // c'tor used by clone() + private Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) { + Preconditions.checkState( + (type.isOffset() && e != null) + || (!type.isOffset() && e == null)); + type_ = type; + expr_ = e; + offsetValue_ = offsetValue; + } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + if (expr_ != null) sb.append(expr_.toSql()).append(" "); + sb.append(type_.toString()); + return sb.toString(); + } + + public TAnalyticWindowBoundary toThrift(Type windowType) { + TAnalyticWindowBoundary result = new TAnalyticWindowBoundary(type_.toThrift()); + if (type_.isOffset() && windowType == Type.ROWS) { + result.setRows_offset_value(offsetValue_.longValue()); + } + // TODO: range windows need range_offset_predicate + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj.getClass() != this.getClass()) return false; + Boundary o = (Boundary)obj; + boolean exprEqual = (expr_ == null) == (o.expr_ == null); + if (exprEqual && expr_ != null) exprEqual = expr_.equals(o.expr_); + return type_ == o.type_ && exprEqual; + } + + public Boundary converse() { + Boundary result = new Boundary(type_.converse(), + (expr_ != null) ? expr_.clone() : null); + result.offsetValue_ = offsetValue_; + return result; + } + + @Override + public Boundary clone() { + return new Boundary(type_, expr_ != null ? expr_.clone() : null, offsetValue_); + } + + public void analyze(Analyzer analyzer) throws AnalysisException { + if (expr_ != null) expr_.analyze(analyzer); + } + } + + private final Type type_; + private final Boundary leftBoundary_; + private Boundary rightBoundary_; // may be null before analyze() + private String toSqlString_; // cached after analysis + + public Type getType() { return type_; } + public Boundary getLeftBoundary() { return leftBoundary_; } + public Boundary getRightBoundary() { return rightBoundary_; } + public Boundary setRightBoundary(Boundary b) { return rightBoundary_ = b; } + + public AnalyticWindow(Type type, Boundary b) { + type_ = type; + Preconditions.checkNotNull(b); + leftBoundary_ = b; + rightBoundary_ = null; + } + + public AnalyticWindow(Type type, Boundary l, Boundary r) { + type_ = type; + Preconditions.checkNotNull(l); + leftBoundary_ = l; + Preconditions.checkNotNull(r); + rightBoundary_ = r; + } + + /** + * Clone c'tor + */ + private AnalyticWindow(AnalyticWindow other) { + type_ = other.type_; + Preconditions.checkNotNull(other.leftBoundary_); + leftBoundary_ = other.leftBoundary_.clone(); + if (other.rightBoundary_ != null) { + rightBoundary_ = other.rightBoundary_.clone(); + } + toSqlString_ = other.toSqlString_; // safe to share + } + + public AnalyticWindow reverse() { + Boundary newRightBoundary = leftBoundary_.converse(); + Boundary newLeftBoundary = null; + if (rightBoundary_ == null) { + newLeftBoundary = new Boundary(leftBoundary_.getType(), null); + } else { + newLeftBoundary = rightBoundary_.converse(); + } + return new AnalyticWindow(type_, newLeftBoundary, newRightBoundary); + } + + public String toSql() { + if (toSqlString_ != null) return toSqlString_; + StringBuilder sb = new StringBuilder(); + sb.append(type_.toString()).append(" "); + if (rightBoundary_ == null) { + sb.append(leftBoundary_.toSql()); + } else { + sb.append("BETWEEN ").append(leftBoundary_.toSql()).append(" AND "); + sb.append(rightBoundary_.toSql()); + } + return sb.toString(); + } + + public TAnalyticWindow toThrift() { + TAnalyticWindow result = new TAnalyticWindow(type_.toThrift()); + if (leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) { + result.setWindow_start(leftBoundary_.toThrift(type_)); + } + Preconditions.checkNotNull(rightBoundary_); + if (rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) { + result.setWindow_end(rightBoundary_.toThrift(type_)); + } + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj.getClass() != this.getClass()) return false; + AnalyticWindow o = (AnalyticWindow)obj; + boolean rightBoundaryEqual = + (rightBoundary_ == null) == (o.rightBoundary_ == null); + if (rightBoundaryEqual && rightBoundary_ != null) { + rightBoundaryEqual = rightBoundary_.equals(o.rightBoundary_); + } + return type_ == o.type_ + && leftBoundary_.equals(o.leftBoundary_) + && rightBoundaryEqual; + } + + @Override + public AnalyticWindow clone() { return new AnalyticWindow(this); } + + /** + * Semantic analysis for expr of a PRECEDING/FOLLOWING clause. + */ + private void checkOffsetExpr(Analyzer analyzer, Boundary boundary) + throws AnalysisException { + Preconditions.checkState(boundary.getType().isOffset()); + Expr e = boundary.getExpr(); + Preconditions.checkNotNull(e); + boolean isPos = true; + Double val = null; + if (e.isConstant() && e.getType().isNumericType()) { + try { + val = TColumnValueUtil.getNumericVal( + FeSupport.EvalConstExpr(e, analyzer.getQueryCtx())); + if (val <= 0) isPos = false; + } catch (InternalException exc) { + throw new AnalysisException( + "Couldn't evaluate PRECEDING/FOLLOWING expression: " + exc.getMessage()); + } + } + + if (type_ == Type.ROWS) { + if (!e.isConstant() || !e.getType().isIntegerType() || !isPos) { + throw new AnalysisException( + "For ROWS window, the value of a PRECEDING/FOLLOWING offset must be a " + + "constant positive integer: " + boundary.toSql()); + } + Preconditions.checkNotNull(val); + boundary.offsetValue_ = new BigDecimal(val.longValue()); + } else { + if (!e.isConstant() || !e.getType().isNumericType() || !isPos) { + throw new AnalysisException( + "For RANGE window, the value of a PRECEDING/FOLLOWING offset must be a " + + "constant positive number: " + boundary.toSql()); + } + boundary.offsetValue_ = new BigDecimal(val); + } + } + + /** + * Check that b1 <= b2. + */ + private void checkOffsetBoundaries(Analyzer analyzer, Boundary b1, Boundary b2) + throws AnalysisException { + Preconditions.checkState(b1.getType().isOffset()); + Preconditions.checkState(b2.getType().isOffset()); + Expr e1 = b1.getExpr(); + Preconditions.checkState( + e1 != null && e1.isConstant() && e1.getType().isNumericType()); + Expr e2 = b2.getExpr(); + Preconditions.checkState( + e2 != null && e2.isConstant() && e2.getType().isNumericType()); + + try { + TColumnValue val1 = FeSupport.EvalConstExpr(e1, analyzer.getQueryCtx()); + TColumnValue val2 = FeSupport.EvalConstExpr(e2, analyzer.getQueryCtx()); + double left = TColumnValueUtil.getNumericVal(val1); + double right = TColumnValueUtil.getNumericVal(val2); + if (left > right) { + throw new AnalysisException( + "Offset boundaries are in the wrong order: " + toSql()); + } + } catch (InternalException exc) { + throw new AnalysisException( + "Couldn't evaluate PRECEDING/FOLLOWING expression: " + exc.getMessage()); + } + + } + + public void analyze(Analyzer analyzer) throws AnalysisException { + leftBoundary_.analyze(analyzer); + if (rightBoundary_ != null) rightBoundary_.analyze(analyzer); + + if (leftBoundary_.getType() == BoundaryType.UNBOUNDED_FOLLOWING) { + throw new AnalysisException( + leftBoundary_.getType().toString() + " is only allowed for upper bound of " + + "BETWEEN"); + } + if (rightBoundary_ != null + && rightBoundary_.getType() == BoundaryType.UNBOUNDED_PRECEDING) { + throw new AnalysisException( + rightBoundary_.getType().toString() + " is only allowed for lower bound of " + + "BETWEEN"); + } + + // TODO: Remove when RANGE windows with offset boundaries are supported. + if (type_ == Type.RANGE) { + if (leftBoundary_.type_.isOffset() + || (rightBoundary_ != null && rightBoundary_.type_.isOffset()) + || (leftBoundary_.type_ == BoundaryType.CURRENT_ROW + && (rightBoundary_ == null + || rightBoundary_.type_ == BoundaryType.CURRENT_ROW))) { + throw new AnalysisException( + "RANGE is only supported with both the lower and upper bounds UNBOUNDED or" + + " one UNBOUNDED and the other CURRENT ROW."); + } + } + + if (rightBoundary_ == null && leftBoundary_.getType() == BoundaryType.FOLLOWING) { + throw new AnalysisException( + leftBoundary_.getType().toString() + " requires a BETWEEN clause"); + } + + if (leftBoundary_.getType().isOffset()) checkOffsetExpr(analyzer, leftBoundary_); + if (rightBoundary_ == null) { + // set right boundary to implied value, but make sure to cache toSql string + // beforehand + toSqlString_ = toSql(); + rightBoundary_ = new Boundary(BoundaryType.CURRENT_ROW, null); + return; + } + if (rightBoundary_.getType().isOffset()) checkOffsetExpr(analyzer, rightBoundary_); + + if (leftBoundary_.getType() == BoundaryType.FOLLOWING) { + if (rightBoundary_.getType() != BoundaryType.FOLLOWING + && rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) { + throw new AnalysisException( + "A lower window bound of " + BoundaryType.FOLLOWING.toString() + + " requires that the upper bound also be " + + BoundaryType.FOLLOWING.toString()); + } + if (rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) { + checkOffsetBoundaries(analyzer, leftBoundary_, rightBoundary_); + } + } + + if (rightBoundary_.getType() == BoundaryType.PRECEDING) { + if (leftBoundary_.getType() != BoundaryType.PRECEDING + && leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) { + throw new AnalysisException( + "An upper window bound of " + BoundaryType.PRECEDING.toString() + + " requires that the lower bound also be " + + BoundaryType.PRECEDING.toString()); + } + if (leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) { + checkOffsetBoundaries(analyzer, rightBoundary_, leftBoundary_); + } + } + } +}
