http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java new file mode 100644 index 0000000..a00bf53 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java @@ -0,0 +1,680 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.common.Id; +import com.cloudera.impala.common.IdGenerator; +import com.cloudera.impala.thrift.TEdgeType; +import com.cloudera.impala.thrift.TQueryCtx; +import com.cloudera.impala.thrift.TLineageGraph; +import com.cloudera.impala.thrift.TMultiEdge; +import com.cloudera.impala.thrift.TVertex; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +/** + * Represents a vertex in the column lineage graph. A Vertex may correspond to a base + * table column, a column in the destination table (for the case of INSERT or CTAS + * queries) or a result expr (labeled column of a query result set). + */ +final class Vertex implements Comparable<Vertex> { + // Unique identifier of this vertex. + private final VertexId id_; + + private final String type_ = "COLUMN"; + + // A fully-qualified column name or the label of a result expr + private final String label_; + + public Vertex(VertexId id, String label) { + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(label); + id_ = id; + label_ = label; + } + public VertexId getVertexId() { return id_; } + public String getLabel() { return label_; } + public String getType() { return type_; } + + @Override + public String toString() { return "(" + id_ + ":" + type_ + ":" + label_ + ")"; } + + /** + * Encodes this Vertex object into a JSON object represented by a Map. + */ + public Map toJson() { + // Use a LinkedHashMap to generate a strict ordering of elements. + Map obj = new LinkedHashMap(); + obj.put("id", id_.asInt()); + obj.put("vertexType", type_); + obj.put("vertexId", label_); + return obj; + } + + /** + * Constructs a Vertex object from a JSON object. The new object is returned. + */ + public static Vertex fromJsonObj(JSONObject obj) { + int id = ((Long) obj.get("id")).intValue(); + String label = (String) obj.get("vertexId"); + return new Vertex(new VertexId(id), label); + } + + /** + * Encodes this Vertex object into a thrift object + */ + public TVertex toThrift() { + return new TVertex(id_.asInt(), label_); + } + + /** + * Constructs a Vertex object from a thrift object. + */ + public static Vertex fromThrift(TVertex vertex) { + int id = ((Long) vertex.id).intValue(); + return new Vertex(new VertexId(id), vertex.label); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj.getClass() != this.getClass()) return false; + Vertex vertex = (Vertex) obj; + return this.id_.equals(vertex.id_) && + this.label_.equals(vertex.label_); + } + + public int compareTo(Vertex cmp) { return this.id_.compareTo(cmp.id_); } + + @Override + public int hashCode() { return id_.hashCode(); } +} + +/** + * Represents the unique identifier of a Vertex. + */ +class VertexId extends Id<VertexId> { + protected VertexId(int id) { + super(id); + } + public static IdGenerator<VertexId> createGenerator() { + return new IdGenerator<VertexId>() { + @Override + public VertexId getNextId() { return new VertexId(nextId_++); } + @Override + public VertexId getMaxId() { return new VertexId(nextId_ - 1); } + }; + } +} + +/** + * Represents a set of uni-directional edges in the column lineage graph, one edge from + * every source Vertex in 'sources_' to every target Vertex in 'targets_'. An edge + * indicates a dependency between a source and a target Vertex. There are two types of + * edges, PROJECTION and PREDICATE, that are described in the ColumnLineageGraph class. + */ +final class MultiEdge { + public static enum EdgeType { + PROJECTION, PREDICATE + } + private final Set<Vertex> sources_; + private final Set<Vertex> targets_; + private final EdgeType edgeType_; + + public MultiEdge(Set<Vertex> sources, Set<Vertex> targets, EdgeType type) { + sources_ = sources; + targets_ = targets; + edgeType_ = type; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + Joiner joiner = Joiner.on(","); + builder.append("Sources: ["); + builder.append(joiner.join(sources_) + "]\n"); + builder.append("Targets: ["); + builder.append(joiner.join(targets_) + "]\n"); + builder.append("Type: " + edgeType_); + return builder.toString(); + } + + /** + * Encodes this MultiEdge object to a JSON object represented by a Map. + */ + public Map toJson() { + Map obj = new LinkedHashMap(); + // Add sources + JSONArray sourceIds = new JSONArray(); + for (Vertex vertex: sources_) { + sourceIds.add(vertex.getVertexId()); + } + obj.put("sources", sourceIds); + // Add targets + JSONArray targetIds = new JSONArray(); + for (Vertex vertex: targets_) { + targetIds.add(vertex.getVertexId()); + } + obj.put("targets", targetIds); + obj.put("edgeType", edgeType_.toString()); + return obj; + } + + /** + * Encodes this MultiEdge object to a thrift object + */ + public TMultiEdge toThrift() { + List<TVertex> sources = Lists.newArrayList(); + for (Vertex vertex: sources_) { + sources.add(vertex.toThrift()); + } + List<TVertex> targets = Lists.newArrayList(); + for (Vertex vertex: targets_) { + targets.add(vertex.toThrift()); + } + if (edgeType_ == EdgeType.PROJECTION) { + return new TMultiEdge(sources, targets, TEdgeType.PROJECTION); + } + return new TMultiEdge(sources, targets, TEdgeType.PREDICATE); + } + + /** + * Constructs a MultiEdge object from a thrift object + */ + public static MultiEdge fromThrift(TMultiEdge obj){ + Set<Vertex> sources = Sets.newHashSet(); + for (TVertex vertex: obj.sources) { + sources.add(Vertex.fromThrift(vertex)); + } + Set<Vertex> targets = Sets.newHashSet(); + for (TVertex vertex: obj.targets) { + targets.add(Vertex.fromThrift(vertex)); + } + if (obj.edgetype == TEdgeType.PROJECTION) { + return new MultiEdge(sources, targets, EdgeType.PROJECTION); + } + return new MultiEdge(sources, targets, EdgeType.PREDICATE); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj.getClass() != this.getClass()) return false; + MultiEdge edge = (MultiEdge) obj; + return edge.sources_.equals(this.sources_) && + edge.targets_.equals(this.targets_) && + edge.edgeType_ == this.edgeType_; + } +} + +/** + * Represents the column lineage graph of a query. This is a directional graph that is + * used to track dependencies among the table/column entities that participate in + * a query. There are two types of dependencies that are represented as edges in the + * column lineage graph: + * a) Projection dependency: This is a dependency between a set of source + * columns (base table columns) and a single target (result expr or table column). + * This dependency indicates that values of the target depend on the values of the source + * columns. + * b) Predicate dependency: This is a dependency between a set of target + * columns (or exprs) and a set of source columns (base table columns). It indicates that + * the source columns restrict the values of their targets (e.g. by participating in + * WHERE clause predicates). + * + * The following dependencies are generated for a query: + * - Exactly one projection dependency for every result expr / target column. + * - Exactly one predicate dependency that targets all result exprs / target cols and + * depends on all columns participating in a conjunct in the query. + * - Special case of analytic fns: One predicate dependency per result expr / target col + * whose value is directly or indirectly affected by an analytic function with a + * partition by and/or order by clause. + */ +public class ColumnLineageGraph { + private final static Logger LOG = LoggerFactory.getLogger(ColumnLineageGraph.class); + // Query statement + private String queryStr_; + + // Name of the user that issued this query + private String user_; + + private final List<Expr> resultDependencyPredicates_ = Lists.newArrayList(); + + private final List<MultiEdge> edges_ = Lists.newArrayList(); + + // Timestamp in seconds since epoch (GMT) this query was submitted for execution. + private long timestamp_; + + // Map of Vertex labels to Vertex objects. + private final Map<String, Vertex> vertices_ = Maps.newHashMap(); + + // Map of Vertex ids to Vertex objects. Used primarily during the construction of the + // ColumnLineageGraph from a serialized JSON object. + private final Map<VertexId, Vertex> idToVertexMap_ = Maps.newHashMap(); + + // For an INSERT or a CTAS, these are the columns of the + // destination table plus any partitioning columns (when dynamic partitioning is used). + // For a SELECT stmt, they are the labels of the result exprs. + private final List<String> targetColumnLabels_ = Lists.newArrayList(); + + // Repository for tuple and slot descriptors for this query. Use it to construct the + // column lineage graph. + private DescriptorTable descTbl_; + + private final IdGenerator<VertexId> vertexIdGenerator = VertexId.createGenerator(); + + public ColumnLineageGraph() { } + + /** + * Private c'tor, used only for testing. + */ + private ColumnLineageGraph(String stmt, String user, long timestamp) { + queryStr_ = stmt; + user_ = user; + timestamp_ = timestamp; + } + + private void setVertices(Set<Vertex> vertices) { + for (Vertex vertex: vertices) { + vertices_.put(vertex.getLabel(), vertex); + idToVertexMap_.put(vertex.getVertexId(), vertex); + } + } + + /** + * Creates a new MultiEdge in the column lineage graph from the sets of 'sources' and + * 'targets' labels (representing column names or result expr labels). The new + * MultiEdge object is returned. + */ + private MultiEdge createMultiEdge(Set<String> targets, Set<String> sources, + MultiEdge.EdgeType type) { + Set<Vertex> targetVertices = Sets.newHashSet(); + for (String target: targets) { + targetVertices.add(createVertex(target)); + } + Set<Vertex> sourceVertices = Sets.newHashSet(); + for (String source: sources) { + sourceVertices.add(createVertex(source)); + } + MultiEdge edge = new MultiEdge(sourceVertices, targetVertices, type); + edges_.add(edge); + return edge; + } + + /** + * Creates a new vertex in the column lineage graph. The new Vertex object is + * returned. If a Vertex with the same label already exists, reuse it. + */ + private Vertex createVertex(String label) { + Vertex newVertex = vertices_.get(label); + if (newVertex != null) return newVertex; + newVertex = new Vertex(vertexIdGenerator.getNextId(), label); + vertices_.put(newVertex.getLabel(), newVertex); + idToVertexMap_.put(newVertex.getVertexId(), newVertex); + return newVertex; + } + + /** + * Computes the column lineage graph of a query from the list of query result exprs. + * 'rootAnalyzer' is the Analyzer that was used for the analysis of the query. + */ + public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) { + init(rootAnalyzer); + computeProjectionDependencies(resultExprs); + computeResultPredicateDependencies(rootAnalyzer); + } + + /** + * Initialize the ColumnLineageGraph from the root analyzer of a query. + */ + private void init(Analyzer analyzer) { + Preconditions.checkNotNull(analyzer); + Preconditions.checkState(analyzer.isRootAnalyzer()); + TQueryCtx queryCtx = analyzer.getQueryCtx(); + if (queryCtx.request.isSetRedacted_stmt()) { + queryStr_ = queryCtx.request.redacted_stmt; + } else { + queryStr_ = queryCtx.request.stmt; + } + Preconditions.checkNotNull(queryStr_); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + try { + timestamp_ = df.parse(queryCtx.now_string).getTime() / 1000; + } catch (java.text.ParseException e) { + LOG.error("Error parsing timestamp value: " + queryCtx.now_string + + " " + e.getMessage()); + timestamp_ = new Date().getTime() / 1000; + } + descTbl_ = analyzer.getDescTbl(); + user_ = analyzer.getUser().getName(); + } + + private void computeProjectionDependencies(List<Expr> resultExprs) { + Preconditions.checkNotNull(resultExprs); + Preconditions.checkState(!resultExprs.isEmpty()); + Preconditions.checkState(resultExprs.size() == targetColumnLabels_.size()); + for (int i = 0; i < resultExprs.size(); ++i) { + Expr expr = resultExprs.get(i); + Set<String> sourceBaseCols = Sets.newHashSet(); + List<Expr> dependentExprs = Lists.newArrayList(); + getSourceBaseCols(expr, sourceBaseCols, dependentExprs, false); + Set<String> targets = Sets.newHashSet(targetColumnLabels_.get(i)); + createMultiEdge(targets, sourceBaseCols, MultiEdge.EdgeType.PROJECTION); + if (!dependentExprs.isEmpty()) { + // We have additional exprs that 'expr' has a predicate dependency on. + // Gather the transitive predicate dependencies of 'expr' based on its direct + // predicate dependencies. For each direct predicate dependency p, 'expr' is + // transitively predicate dependent on all exprs that p is projection and + // predicate dependent on. + Set<String> predicateBaseCols = Sets.newHashSet(); + for (Expr dependentExpr: dependentExprs) { + getSourceBaseCols(dependentExpr, predicateBaseCols, null, true); + } + createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE); + } + } + } + + /** + * Compute predicate dependencies for the query result, i.e. exprs that affect the + * possible values of the result exprs / target columns, such as predicates in a WHERE + * clause. + */ + private void computeResultPredicateDependencies(Analyzer analyzer) { + List<Expr> conjuncts = analyzer.getConjuncts(); + for (Expr expr: conjuncts) { + if (expr.isAuxExpr()) continue; + resultDependencyPredicates_.add(expr); + } + Set<String> predicateBaseCols = Sets.newHashSet(); + for (Expr expr: resultDependencyPredicates_) { + getSourceBaseCols(expr, predicateBaseCols, null, true); + } + if (predicateBaseCols.isEmpty()) return; + Set<String> targets = Sets.newHashSet(targetColumnLabels_); + createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE); + } + + /** + * Identify the base table columns that 'expr' is connected to by recursively resolving + * all associated slots through inline views and materialization points to base-table + * slots. If 'directPredDeps' is not null, it is populated with the exprs that + * have a predicate dependency with 'expr' (e.g. partitioning and order by exprs for + * the case of an analytic function). If 'traversePredDeps' is false, not all the + * children exprs of 'expr' are used to identify the base columns that 'expr' is + * connected to. Which children are filtered depends on the type of 'expr' (e.g. for + * AnalyticFunctionExpr, grouping and sorting exprs are filtered out). + */ + private void getSourceBaseCols(Expr expr, Set<String> sourceBaseCols, + List<Expr> directPredDeps, boolean traversePredDeps) { + List<Expr> exprsToTraverse = getProjectionDeps(expr); + List<Expr> predicateDepExprs = getPredicateDeps(expr); + if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs); + if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs); + List<SlotId> slotIds = Lists.newArrayList(); + for (Expr e: exprsToTraverse) { + e.getIds(null, slotIds); + } + for (SlotId slotId: slotIds) { + SlotDescriptor slotDesc = descTbl_.getSlotDesc(slotId); + List<Expr> sourceExprs = slotDesc.getSourceExprs(); + if (sourceExprs.isEmpty() && slotDesc.isScanSlot() && + slotDesc.getPath().isRootedAtTuple()) { + // slot should correspond to a materialized tuple of a table + Preconditions.checkState(slotDesc.getParent().isMaterialized()); + List<String> path = slotDesc.getPath().getCanonicalPath(); + sourceBaseCols.add(Joiner.on(".").join(path)); + } else { + for (Expr sourceExpr: sourceExprs) { + getSourceBaseCols(sourceExpr, sourceBaseCols, directPredDeps, + traversePredDeps); + } + } + } + } + + /** + * Retrieve the exprs that 'e' is directly projection dependent on. + * TODO Handle conditional exprs (e.g. CASE, IF). + */ + private List<Expr> getProjectionDeps(Expr e) { + Preconditions.checkNotNull(e); + List<Expr> outputExprs = Lists.newArrayList(); + if (e instanceof AnalyticExpr) { + AnalyticExpr analytic = (AnalyticExpr) e; + outputExprs.addAll(analytic.getChildren().subList(0, + analytic.getFnCall().getParams().size())); + } else { + outputExprs.add(e); + } + return outputExprs; + } + + /** + * Retrieve the exprs that 'e' is directly predicate dependent on. + * TODO Handle conditional exprs (e.g. CASE, IF). + */ + private List<Expr> getPredicateDeps(Expr e) { + Preconditions.checkNotNull(e); + List<Expr> outputExprs = Lists.newArrayList(); + if (e instanceof AnalyticExpr) { + AnalyticExpr analyticExpr = (AnalyticExpr) e; + outputExprs.addAll(analyticExpr.getPartitionExprs()); + for (OrderByElement orderByElem: analyticExpr.getOrderByElements()) { + outputExprs.add(orderByElem.getExpr()); + } + } + return outputExprs; + } + + public void addDependencyPredicates(Collection<Expr> exprs) { + resultDependencyPredicates_.addAll(exprs); + } + + /** + * Encodes the ColumnLineageGraph object to JSON. + */ + public String toJson() { + if (Strings.isNullOrEmpty(queryStr_)) return ""; + Map obj = new LinkedHashMap(); + obj.put("queryText", queryStr_); + obj.put("hash", getQueryHash(queryStr_)); + obj.put("user", user_); + obj.put("timestamp", timestamp_); + // Add edges + JSONArray edges = new JSONArray(); + for (MultiEdge edge: edges_) { + edges.add(edge.toJson()); + } + obj.put("edges", edges); + // Add vertices + TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values()); + JSONArray vertices = new JSONArray(); + for (Vertex vertex: sortedVertices) { + vertices.add(vertex.toJson()); + } + obj.put("vertices", vertices); + return JSONValue.toJSONString(obj); + } + + /** + * Serializes the ColumnLineageGraph to a thrift object + */ + public TLineageGraph toThrift() { + TLineageGraph graph = new TLineageGraph(); + if (Strings.isNullOrEmpty(queryStr_)) return graph; + graph.setQuery_text(queryStr_); + graph.setHash(getQueryHash(queryStr_)); + graph.setUser(user_); + graph.setStarted(timestamp_); + // Add edges + List<TMultiEdge> edges = Lists.newArrayList(); + for (MultiEdge edge: edges_) { + edges.add(edge.toThrift()); + } + graph.setEdges(edges); + // Add vertices + TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values()); + List<TVertex> vertices = Lists.newArrayList(); + for (Vertex vertex: sortedVertices) { + vertices.add(vertex.toThrift()); + } + graph.setVertices(vertices); + return graph; + } + + /** + * Creates a LineageGraph object from a thrift object + */ + public static ColumnLineageGraph fromThrift(TLineageGraph obj) { + ColumnLineageGraph lineage = + new ColumnLineageGraph(obj.query_text, obj.user, obj.started); + TreeSet<Vertex> vertices = Sets.newTreeSet(); + for (TVertex vertex: obj.vertices) { + vertices.add(Vertex.fromThrift(vertex)); + } + lineage.setVertices(vertices); + for (TMultiEdge edge: obj.edges) { + MultiEdge e = MultiEdge.fromThrift(edge); + lineage.edges_.add(e); + } + return lineage; + } + + private String getQueryHash(String queryStr) { + Hasher hasher = Hashing.md5().newHasher(); + hasher.putString(queryStr); + return hasher.hash().toString(); + } + + /** + * Creates a ColumnLineageGraph object from a serialized JSON record. The new + * ColumnLineageGraph object is returned. Used only during testing. + */ + public static ColumnLineageGraph createFromJSON(String json) { + if (json == null || json.isEmpty()) return null; + JSONParser parser = new JSONParser(); + Object obj = null; + try { + obj = parser.parse(json); + } catch (ParseException e) { + LOG.error("Error parsing serialized column lineage graph: " + e.getMessage()); + return null; + } + if (!(obj instanceof JSONObject)) return null; + JSONObject jsonObj = (JSONObject) obj; + String stmt = (String) jsonObj.get("queryText"); + String hash = (String) jsonObj.get("hash"); + String user = (String) jsonObj.get("user"); + long timestamp = (Long) jsonObj.get("timestamp"); + ColumnLineageGraph graph = new ColumnLineageGraph(stmt, user, timestamp); + JSONArray serializedVertices = (JSONArray) jsonObj.get("vertices"); + Set<Vertex> vertices = Sets.newHashSet(); + for (int i = 0; i < serializedVertices.size(); ++i) { + Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i)); + vertices.add(v); + } + graph.setVertices(vertices); + JSONArray serializedEdges = (JSONArray) jsonObj.get("edges"); + for (int i = 0; i < serializedEdges.size(); ++i) { + MultiEdge e = + graph.createMultiEdgeFromJSONObj((JSONObject) serializedEdges.get(i)); + graph.edges_.add(e); + } + return graph; + } + + private MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge) { + Preconditions.checkNotNull(jsonEdge); + JSONArray sources = (JSONArray) jsonEdge.get("sources"); + Set<Vertex> sourceVertices = getVerticesFromJSONArray(sources); + JSONArray targets = (JSONArray) jsonEdge.get("targets"); + Set<Vertex> targetVertices = getVerticesFromJSONArray(targets); + MultiEdge.EdgeType type = + MultiEdge.EdgeType.valueOf((String) jsonEdge.get("edgeType")); + return new MultiEdge(sourceVertices, targetVertices, type); + } + + private Set<Vertex> getVerticesFromJSONArray(JSONArray vertexIdArray) { + Set<Vertex> vertices = Sets.newHashSet(); + for (int i = 0; i < vertexIdArray.size(); ++i) { + int sourceId = ((Long) vertexIdArray.get(i)).intValue(); + Vertex sourceVertex = idToVertexMap_.get(new VertexId(sourceId)); + Preconditions.checkNotNull(sourceVertex); + vertices.add(sourceVertex); + } + return vertices; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj.getClass() != this.getClass()) return false; + ColumnLineageGraph g = (ColumnLineageGraph) obj; + if (!this.vertices_.equals(g.vertices_) || + !this.edges_.equals(g.edges_)) { + return false; + } + return true; + } + + public String debugString() { + StringBuilder builder = new StringBuilder(); + for (MultiEdge edge: edges_) { + builder.append(edge.toString() + "\n"); + } + builder.append(toJson()); + return builder.toString(); + } + + public void addTargetColumnLabels(Collection<String> columnLabels) { + Preconditions.checkNotNull(columnLabels); + targetColumnLabels_.addAll(columnLabels); + } + + public void addTargetColumnLabels(Table dstTable) { + Preconditions.checkNotNull(dstTable); + String tblFullName = dstTable.getFullName(); + for (String columnName: dstTable.getColumnNames()) { + targetColumnLabels_.add(tblFullName + "." + columnName); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java new file mode 100644 index 0000000..4869004 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.List; + +import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.catalog.Function.CompareMode; +import com.cloudera.impala.catalog.ScalarFunction; +import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TExprNode; +import com.cloudera.impala.thrift.TExprNodeType; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * &&, ||, ! predicates. + * + */ +public class CompoundPredicate extends Predicate { + public enum Operator { + AND("AND"), + OR("OR"), + NOT("NOT"); + + private final String description; + + private Operator(String description) { + this.description = description; + } + + @Override + public String toString() { + return description; + } + } + private final Operator op_; + + public static void initBuiltins(Db db) { + // AND and OR are implemented as custom exprs, so they do not have a function symbol. + db.addBuiltin(ScalarFunction.createBuiltinOperator( + Operator.AND.name(), "", + Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN)); + db.addBuiltin(ScalarFunction.createBuiltinOperator( + Operator.OR.name(), "", + Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN)); + db.addBuiltin(ScalarFunction.createBuiltinOperator( + Operator.NOT.name(), "impala::CompoundPredicate::Not", + Lists.<Type>newArrayList(Type.BOOLEAN), Type.BOOLEAN)); + } + + public CompoundPredicate(Operator op, Expr e1, Expr e2) { + super(); + this.op_ = op; + Preconditions.checkNotNull(e1); + children_.add(e1); + Preconditions.checkArgument(op == Operator.NOT && e2 == null + || op != Operator.NOT && e2 != null); + if (e2 != null) children_.add(e2); + } + + /** + * Copy c'tor used in clone(). + */ + protected CompoundPredicate(CompoundPredicate other) { + super(other); + op_ = other.op_; + } + + public Operator getOp() { return op_; } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) return false; + return ((CompoundPredicate) obj).op_ == op_; + } + + @Override + public String debugString() { + return Objects.toStringHelper(this) + .add("op", op_) + .addValue(super.debugString()) + .toString(); + } + + @Override + public String toSqlImpl() { + if (children_.size() == 1) { + Preconditions.checkState(op_ == Operator.NOT); + return "NOT " + getChild(0).toSql(); + } else { + return getChild(0).toSql() + " " + op_.toString() + " " + getChild(1).toSql(); + } + } + + @Override + protected void toThrift(TExprNode msg) { + msg.node_type = TExprNodeType.COMPOUND_PRED; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (isAnalyzed_) return; + super.analyze(analyzer); + + // Check that children are predicates. + for (Expr e: children_) { + if (!e.getType().isBoolean() && !e.getType().isNull()) { + throw new AnalysisException(String.format("Operand '%s' part of predicate " + + "'%s' should return type 'BOOLEAN' but returns type '%s'.", + e.toSql(), toSql(), e.getType().toSql())); + } + } + + fn_ = getBuiltinFunction(analyzer, op_.toString(), collectChildReturnTypes(), + CompareMode.IS_NONSTRICT_SUPERTYPE_OF); + Preconditions.checkState(fn_ != null); + Preconditions.checkState(fn_.getReturnType().isBoolean()); + castForFunctionCall(false); + if (hasChildCosts()) evalCost_ = getChildCosts() + COMPOUND_PREDICATE_COST; + + if (!getChild(0).hasSelectivity() || + (children_.size() == 2 && !getChild(1).hasSelectivity())) { + // Give up if one of our children has an unknown selectivity. + selectivity_ = -1; + return; + } + + switch (op_) { + case AND: + selectivity_ = getChild(0).selectivity_ * getChild(1).selectivity_; + break; + case OR: + selectivity_ = getChild(0).selectivity_ + getChild(1).selectivity_ + - getChild(0).selectivity_ * getChild(1).selectivity_; + break; + case NOT: + selectivity_ = 1.0 - getChild(0).selectivity_; + break; + } + selectivity_ = Math.max(0.0, Math.min(1.0, selectivity_)); + } + + /** + * Retrieve the slots bound by BinaryPredicate, InPredicate and + * CompoundPredicates in the subtree rooted at 'this'. + */ + public ArrayList<SlotRef> getBoundSlots() { + ArrayList<SlotRef> slots = Lists.newArrayList(); + for (int i = 0; i < getChildren().size(); ++i) { + if (getChild(i) instanceof BinaryPredicate || + getChild(i) instanceof InPredicate) { + slots.add(((Predicate)getChild(i)).getBoundSlot()); + } else if (getChild(i) instanceof CompoundPredicate) { + slots.addAll(((CompoundPredicate)getChild(i)).getBoundSlots()); + } + } + return slots; + } + + /** + * Negates a CompoundPredicate. + */ + @Override + public Expr negate() { + if (op_ == Operator.NOT) return getChild(0); + Expr negatedLeft = getChild(0).negate(); + Expr negatedRight = getChild(1).negate(); + Operator newOp = (op_ == Operator.OR) ? Operator.AND : Operator.OR; + return new CompoundPredicate(newOp, negatedLeft, negatedRight); + } + + /** + * Creates a conjunctive predicate from a list of exprs. + */ + public static Expr createConjunctivePredicate(List<Expr> conjuncts) { + Expr conjunctivePred = null; + for (Expr expr: conjuncts) { + if (conjunctivePred == null) { + conjunctivePred = expr; + continue; + } + conjunctivePred = new CompoundPredicate(CompoundPredicate.Operator.AND, + expr, conjunctivePred); + } + return conjunctivePred; + } + + @Override + public Expr clone() { return new CompoundPredicate(this); } + + // Create an AND predicate between two exprs, 'lhs' and 'rhs'. If + // 'rhs' is null, simply return 'lhs'. + public static Expr createConjunction(Expr lhs, Expr rhs) { + if (rhs == null) return lhs; + return new CompoundPredicate(Operator.AND, rhs, lhs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java new file mode 100644 index 0000000..cd01713 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.log4j.Logger; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.Column; +import com.cloudera.impala.catalog.HBaseTable; +import com.cloudera.impala.catalog.HdfsPartition; +import com.cloudera.impala.catalog.HdfsTable; +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.catalog.View; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.PrintUtils; +import com.cloudera.impala.thrift.TComputeStatsParams; +import com.cloudera.impala.thrift.TPartitionStats; +import com.cloudera.impala.thrift.TTableName; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Represents a COMPUTE STATS <table> and COMPUTE INCREMENTAL STATS <table> [PARTITION + * <part_spec>] statement for statistics collection. The former statement gathers all + * table and column stats for a given table and stores them in the Metastore via the + * CatalogService. All existing stats for that table are replaced and no existing stats + * are reused. The latter, incremental form, similarly computes stats for the whole table + * but does so by re-using stats from partitions which have 'valid' statistics. Statistics + * are 'valid' currently if they exist, in the future they may be expired based on recency + * etc. + * + * TODO: Allow more coarse/fine grained (db, column) + * TODO: Compute stats on complex types. + */ +public class ComputeStatsStmt extends StatementBase { + private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class); + + private static String AVRO_SCHEMA_MSG_PREFIX = "Cannot COMPUTE STATS on Avro table " + + "'%s' because its column definitions do not match those in the Avro schema."; + private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " + + "column definitions, e.g., using the result of 'SHOW CREATE TABLE'"; + + protected final TableName tableName_; + + // Set during analysis. + protected Table table_; + + // The Null count is not currently being used in optimization or run-time, + // and compute stats runs 2x faster in many cases when not counting NULLs. + private static final boolean COUNT_NULLS = false; + + // Query for getting the per-partition row count and the total row count. + // Set during analysis. + protected String tableStatsQueryStr_; + + // Query for getting the per-column NDVs and number of NULLs. + // Set during analysis. + protected String columnStatsQueryStr_; + + // If true, stats will be gathered incrementally per-partition. + private boolean isIncremental_ = false; + + // If true, expect the compute stats process to produce output for all partitions in the + // target table (only meaningful, therefore, if partitioned). This is always true for + // non-incremental computations. If set, expectedPartitions_ will be empty - the point + // of this flag is to optimise the case where all partitions are targeted. + private boolean expectAllPartitions_ = false; + + // The list of valid partition statistics that can be used in an incremental computation + // without themselves being recomputed. Populated in analyze(). + private final List<TPartitionStats> validPartStats_ = Lists.newArrayList(); + + // For incremental computations, the list of partitions (identified by list of partition + // column values) that we expect to receive results for. Used to ensure that even empty + // partitions emit results. + // TODO: Consider using partition IDs (and adding them to the child queries with a + // PARTITION_ID() builtin) + private final List<List<String>> expectedPartitions_ = Lists.newArrayList(); + + // If non-null, the partition that an incremental computation might apply to. Must be + // null if this is a non-incremental computation. + private PartitionSpec partitionSpec_ = null; + + // The maximum number of partitions that may be explicitly selected by filter + // predicates. Any query that selects more than this automatically drops back to a full + // incremental stats recomputation. + // TODO: We can probably do better than this, e.g. running several queries, each of + // which selects up to MAX_INCREMENTAL_PARTITIONS partitions. + private static final int MAX_INCREMENTAL_PARTITIONS = 1000; + + /** + * Constructor for the non-incremental form of COMPUTE STATS. + */ + protected ComputeStatsStmt(TableName tableName) { + this(tableName, false, null); + } + + /** + * Constructor for the incremental form of COMPUTE STATS. If isIncremental is true, + * statistics will be recomputed incrementally; if false they will be recomputed for the + * whole table. The partition spec partSpec can specify a single partition whose stats + * should be recomputed. + */ + protected ComputeStatsStmt(TableName tableName, boolean isIncremental, + PartitionSpec partSpec) { + Preconditions.checkState(tableName != null && !tableName.isEmpty()); + Preconditions.checkState(isIncremental || partSpec == null); + this.tableName_ = tableName; + this.table_ = null; + this.isIncremental_ = isIncremental; + this.partitionSpec_ = partSpec; + if (partitionSpec_ != null) { + partitionSpec_.setTableName(tableName); + partitionSpec_.setPrivilegeRequirement(Privilege.ALTER); + } + } + + /** + * Utility method for constructing the child queries to add partition columns to both a + * select list and a group-by list; the former are wrapped in a cast to a string. + */ + private void addPartitionCols(HdfsTable table, List<String> selectList, + List<String> groupByCols) { + for (int i = 0; i < table.getNumClusteringCols(); ++i) { + String colRefSql = ToSqlUtils.getIdentSql(table.getColumns().get(i).getName()); + groupByCols.add(colRefSql); + // For the select list, wrap the group by columns in a cast to string because + // the Metastore stores them as strings. + selectList.add(colRefSql); + } + } + + private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) { + List<String> columnStatsSelectList = Lists.newArrayList(); + // For Hdfs tables, exclude partition columns from stats gathering because Hive + // cannot store them as part of the non-partition column stats. For HBase tables, + // include the single clustering column (the row key). + int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols(); + final String ndvUda = isIncremental_ ? "NDV_NO_FINALIZE" : "NDV"; + + for (int i = startColIdx; i < table_.getColumns().size(); ++i) { + Column c = table_.getColumns().get(i); + Type type = c.getType(); + + // Ignore columns with an invalid/unsupported type. For example, complex types in + // an HBase-backed table will appear as invalid types. + if (!type.isValid() || !type.isSupported() + || c.getType().isComplexType()) { + continue; + } + // NDV approximation function. Add explicit alias for later identification when + // updating the Metastore. + String colRefSql = ToSqlUtils.getIdentSql(c.getName()); + columnStatsSelectList.add(ndvUda + "(" + colRefSql + ") AS " + colRefSql); + + if (COUNT_NULLS) { + // Count the number of NULL values. + columnStatsSelectList.add("COUNT(IF(" + colRefSql + " IS NULL, 1, NULL))"); + } else { + // Using -1 to indicate "unknown". We need cast to BIGINT because backend expects + // an i64Val as the number of NULLs returned by the COMPUTE STATS column stats + // child query. See CatalogOpExecutor::SetColumnStats(). If we do not cast, then + // the -1 will be treated as TINYINT resulting a 0 to be placed in the #NULLs + // column (see IMPALA-1068). + columnStatsSelectList.add("CAST(-1 as BIGINT)"); + } + + // For STRING columns also compute the max and avg string length. + if (type.isStringType()) { + columnStatsSelectList.add("MAX(length(" + colRefSql + "))"); + columnStatsSelectList.add("AVG(length(" + colRefSql + "))"); + } else { + // For non-STRING columns we use the fixed size of the type. + // We store the same information for all types to avoid having to + // treat STRING columns specially in the BE CatalogOpExecutor. + Integer typeSize = type.getPrimitiveType().getSlotSize(); + columnStatsSelectList.add(typeSize.toString()); + columnStatsSelectList.add("CAST(" + typeSize.toString() + " as DOUBLE)"); + } + + if (isIncremental_) { + // Need the count in order to properly combine per-partition column stats + columnStatsSelectList.add("COUNT(" + colRefSql + ")"); + } + } + return columnStatsSelectList; + } + + /** + * Constructs two queries to compute statistics for 'tableName_', if that table exists + * (although if we can detect that no work needs to be done for either query, that query + * will be 'null' and not executed). + * + * The first query computes the number of rows (on a per-partition basis if the table is + * partitioned) and has the form "SELECT COUNT(*) FROM tbl GROUP BY part_col1, + * part_col2...", with an optional WHERE clause for incremental computation (see below). + * + * The second query computes the NDV estimate, the average width, the maximum width and, + * optionally, the number of nulls for each column. For non-partitioned tables (or + * non-incremental computations), the query is simple: + * + * SELECT NDV(col), COUNT(<nulls>), MAX(length(col)), AVG(length(col)) FROM tbl + * + * (For non-string columns, the widths are hard-coded as they are known at query + * construction time). + * + * If computation is incremental (i.e. the original statement was COMPUTE INCREMENTAL + * STATS.., and the underlying table is a partitioned HdfsTable), some modifications are + * made to the non-incremental per-column query. First, a different UDA, + * NDV_NO_FINALIZE() is used to retrieve and serialise the intermediate state from each + * column. Second, the results are grouped by partition, as with the row count query, so + * that the intermediate NDV computation state can be stored per-partition. The number + * of rows per-partition are also recorded. + * + * For both the row count query, and the column stats query, the query's WHERE clause is + * used to restrict execution only to partitions that actually require new statistics to + * be computed. + * + * SELECT NDV_NO_FINALIZE(col), <nulls, max, avg>, COUNT(col) FROM tbl + * GROUP BY part_col1, part_col2, ... + * WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR + * ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ... + */ + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + table_ = analyzer.getTable(tableName_, Privilege.ALTER); + String sqlTableName = table_.getTableName().toSql(); + if (table_ instanceof View) { + throw new AnalysisException(String.format( + "COMPUTE STATS not supported for view %s", sqlTableName)); + } + + if (!(table_ instanceof HdfsTable)) { + if (partitionSpec_ != null) { + throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " + + "for non-HDFS table " + table_.getTableName()); + } + isIncremental_ = false; + } + + // Ensure that we write an entry for every partition if this isn't incremental + if (!isIncremental_) expectAllPartitions_ = true; + + HdfsTable hdfsTable = null; + if (table_ instanceof HdfsTable) { + hdfsTable = (HdfsTable)table_; + if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 && + partitionSpec_ != null) { + throw new AnalysisException(String.format( + "Can't compute PARTITION stats on an unpartitioned table: %s", + sqlTableName)); + } else if (partitionSpec_ != null) { + partitionSpec_.setPartitionShouldExist(); + partitionSpec_.analyze(analyzer); + for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) { + // TODO: We could match the dynamic keys (i.e. as wildcards) as well, but that + // would involve looping over all partitions and seeing which match the + // partition spec. + if (!kv.isStatic()) { + throw new AnalysisException("All partition keys must have values: " + + kv.toString()); + } + } + } + // For incremental stats, estimate the size of intermediate stats and report an + // error if the estimate is greater than MAX_INCREMENTAL_STATS_SIZE_BYTES. + if (isIncremental_) { + long statsSizeEstimate = hdfsTable.getColumns().size() * + hdfsTable.getPartitions().size() * HdfsTable.STATS_SIZE_PER_COLUMN_BYTES; + if (statsSizeEstimate > HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES) { + LOG.error("Incremental stats size estimate for table " + hdfsTable.getName() + + " exceeded " + HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES + ", estimate = " + + statsSizeEstimate); + throw new AnalysisException("Incremental stats size estimate exceeds " + + PrintUtils.printBytes(HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES) + + ". Please try COMPUTE STATS instead."); + } + } + } + + // Build partition filters that only select partitions without valid statistics for + // incremental computation. + List<String> filterPreds = Lists.newArrayList(); + if (isIncremental_) { + if (partitionSpec_ == null) { + // If any column does not have stats, we recompute statistics for all partitions + // TODO: need a better way to invalidate stats for all partitions, so that we can + // use this logic to only recompute new / changed columns. + boolean tableIsMissingColStats = false; + + // We'll warn the user if a column is missing stats (and therefore we rescan the + // whole table), but if all columns are missing stats, the table just doesn't have + // any stats and there's no need to warn. + boolean allColumnsMissingStats = true; + String exampleColumnMissingStats = null; + // Partition columns always have stats, so exclude them from this search + for (Column col: table_.getNonClusteringColumns()) { + if (!col.getStats().hasStats()) { + if (!tableIsMissingColStats) { + tableIsMissingColStats = true; + exampleColumnMissingStats = col.getName(); + } + } else { + allColumnsMissingStats = false; + } + } + + if (tableIsMissingColStats && !allColumnsMissingStats) { + analyzer.addWarning("Column " + exampleColumnMissingStats + + " does not have statistics, recomputing stats for the whole table"); + } + + for (HdfsPartition p: hdfsTable.getPartitions()) { + if (p.isDefaultPartition()) continue; + TPartitionStats partStats = p.getPartitionStats(); + if (!p.hasIncrementalStats() || tableIsMissingColStats) { + if (partStats == null) LOG.trace(p.toString() + " does not have stats"); + if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql()); + List<String> partValues = Lists.newArrayList(); + for (LiteralExpr partValue: p.getPartitionValues()) { + partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue, + "NULL")); + } + expectedPartitions_.add(partValues); + } else { + LOG.trace(p.toString() + " does have statistics"); + validPartStats_.add(partStats); + } + } + if (expectedPartitions_.size() == hdfsTable.getPartitions().size() - 1) { + expectedPartitions_.clear(); + expectAllPartitions_ = true; + } + } else { + // Always compute stats on a particular partition when told to. + List<String> partitionConjuncts = Lists.newArrayList(); + for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) { + partitionConjuncts.add(kv.toPredicateSql()); + } + filterPreds.add("(" + Joiner.on(" AND ").join(partitionConjuncts) + ")"); + HdfsPartition targetPartition = + hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues()); + List<String> partValues = Lists.newArrayList(); + for (LiteralExpr partValue: targetPartition.getPartitionValues()) { + partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue, + "NULL")); + } + expectedPartitions_.add(partValues); + for (HdfsPartition p: hdfsTable.getPartitions()) { + if (p.isDefaultPartition()) continue; + if (p == targetPartition) continue; + TPartitionStats partStats = p.getPartitionStats(); + if (partStats != null) validPartStats_.add(partStats); + } + } + + if (filterPreds.size() == 0 && validPartStats_.size() != 0) { + LOG.info("No partitions selected for incremental stats update"); + analyzer.addWarning("No partitions selected for incremental stats update"); + return; + } + } + + if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) { + // TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS partitions, and then + // advising the user to iterate. + analyzer.addWarning( + "Too many partitions selected, doing full recomputation of incremental stats"); + filterPreds.clear(); + validPartStats_.clear(); + } + + List<String> groupByCols = Lists.newArrayList(); + List<String> partitionColsSelectList = Lists.newArrayList(); + // Only add group by clause for HdfsTables. + if (hdfsTable != null) { + if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable); + addPartitionCols(hdfsTable, partitionColsSelectList, groupByCols); + } + + // Query for getting the per-partition row count and the total row count. + StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT "); + List<String> tableStatsSelectList = Lists.newArrayList(); + tableStatsSelectList.add("COUNT(*)"); + + tableStatsSelectList.addAll(partitionColsSelectList); + tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList)); + tableStatsQueryBuilder.append(" FROM " + sqlTableName); + + // Query for getting the per-column NDVs and number of NULLs. + List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer); + + if (isIncremental_) columnStatsSelectList.addAll(partitionColsSelectList); + + StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT "); + columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList)); + columnStatsQueryBuilder.append(" FROM " + sqlTableName); + + // Add the WHERE clause to filter out partitions that we don't want to compute + // incremental stats for. While this is a win in most situations, we would like to + // avoid this where it does no useful work (i.e. it selects all rows). This happens + // when there are no existing valid partitions (so all partitions will have been + // selected in) and there is no partition spec (so no single partition was explicitly + // selected in). + if (filterPreds.size() > 0 && + (validPartStats_.size() > 0 || partitionSpec_ != null)) { + String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds); + columnStatsQueryBuilder.append(filterClause); + tableStatsQueryBuilder.append(filterClause); + } + + if (groupByCols.size() > 0) { + String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols); + if (isIncremental_) columnStatsQueryBuilder.append(groupBy); + tableStatsQueryBuilder.append(groupBy); + } + + tableStatsQueryStr_ = tableStatsQueryBuilder.toString(); + LOG.debug("Table stats query: " + tableStatsQueryStr_); + + if (columnStatsSelectList.isEmpty()) { + // Table doesn't have any columns that we can compute stats for. + LOG.info("No supported column types in table " + table_.getTableName() + + ", no column statistics will be gathered."); + columnStatsQueryStr_ = null; + return; + } + + columnStatsQueryStr_ = columnStatsQueryBuilder.toString(); + LOG.debug("Column stats query: " + columnStatsQueryStr_); + } + + /** + * Checks whether the column definitions from the CREATE TABLE stmt match the columns + * in the Avro schema. If there is a mismatch, then COMPUTE STATS cannot update the + * statistics in the Metastore's backend DB due to HIVE-6308. Throws an + * AnalysisException for such ill-created Avro tables. Does nothing if + * the column definitions match the Avro schema exactly. + */ + private void checkIncompleteAvroSchema(HdfsTable table) throws AnalysisException { + Preconditions.checkState(table.isAvroTable()); + org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable(); + // The column definitions from 'CREATE TABLE (column definitions) ...' + Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator(); + // The columns derived from the Avro schema file or literal schema. + // Inconsistencies between the Avro-schema columns and the column definitions + // are sometimes resolved in the CREATE TABLE, and sometimes not (see below). + Iterator<Column> avroSchemaCols = table.getColumns().iterator(); + // Skip partition columns from 'table' since those are not present in + // the msTable field schemas. + for (int i = 0; i < table.getNumClusteringCols(); ++i) { + if (avroSchemaCols.hasNext()) avroSchemaCols.next(); + } + int pos = 0; + while (colDefs.hasNext() || avroSchemaCols.hasNext()) { + if (colDefs.hasNext() && avroSchemaCols.hasNext()) { + FieldSchema colDef = colDefs.next(); + Column avroSchemaCol = avroSchemaCols.next(); + // Check that the column names are identical. Ignore mismatched types + // as those will either fail in the scan or succeed. + if (!colDef.getName().equalsIgnoreCase(avroSchemaCol.getName())) { + throw new AnalysisException( + String.format(AVRO_SCHEMA_MSG_PREFIX + + "\nDefinition of column '%s' of type '%s' does not match " + + "the Avro-schema column '%s' of type '%s' at position '%s'.\n" + + AVRO_SCHEMA_MSG_SUFFIX, + table.getName(), colDef.getName(), colDef.getType(), + avroSchemaCol.getName(), avroSchemaCol.getType(), pos)); + } + } + // The following two cases are typically not possible because Hive resolves + // inconsistencies between the column-definition list and the Avro schema if a + // column-definition list was given in the CREATE TABLE (having no column + // definitions at all results in HIVE-6308). Even so, we check these cases for + // extra safety. COMPUTE STATS could be made to succeed in special instances of + // the cases below but we chose to throw an AnalysisException to avoid confusion + // because this scenario "should" never arise as mentioned above. + if (colDefs.hasNext() && !avroSchemaCols.hasNext()) { + FieldSchema colDef = colDefs.next(); + throw new AnalysisException( + String.format(AVRO_SCHEMA_MSG_PREFIX + + "\nMissing Avro-schema column corresponding to column " + + "definition '%s' of type '%s' at position '%s'.\n" + + AVRO_SCHEMA_MSG_SUFFIX, + table.getName(), colDef.getName(), colDef.getType(), pos)); + } + if (!colDefs.hasNext() && avroSchemaCols.hasNext()) { + Column avroSchemaCol = avroSchemaCols.next(); + throw new AnalysisException( + String.format(AVRO_SCHEMA_MSG_PREFIX + + "\nMissing column definition corresponding to Avro-schema " + + "column '%s' of type '%s' at position '%s'.\n" + + AVRO_SCHEMA_MSG_SUFFIX, + table.getName(), avroSchemaCol.getName(), avroSchemaCol.getType(), pos)); + } + ++pos; + } + } + + public String getTblStatsQuery() { return tableStatsQueryStr_; } + public String getColStatsQuery() { return columnStatsQueryStr_; } + + @Override + public String toSql() { + if (!isIncremental_) { + return "COMPUTE STATS " + tableName_.toSql(); + } else { + return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() + + partitionSpec_ == null ? "" : partitionSpec_.toSql(); + } + } + + public TComputeStatsParams toThrift() { + TComputeStatsParams params = new TComputeStatsParams(); + params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName())); + params.setTbl_stats_query(tableStatsQueryStr_); + if (columnStatsQueryStr_ != null) { + params.setCol_stats_query(columnStatsQueryStr_); + } else { + params.setCol_stats_queryIsSet(false); + } + + params.setIs_incremental(isIncremental_); + params.setExisting_part_stats(validPartStats_); + params.setExpect_all_partitions(expectAllPartitions_); + if (!expectAllPartitions_) params.setExpected_partitions(expectedPartitions_); + if (isIncremental_) { + params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols()); + } + return params; + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java new file mode 100644 index 0000000..1ee6fd4 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.extdatasource.ApiVersion; +import com.cloudera.impala.thrift.TCreateDataSourceParams; +import com.cloudera.impala.thrift.TDataSource; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Represents a CREATE DATA SOURCE statement. + */ +public class CreateDataSrcStmt extends StatementBase { + private final String dataSrcName_; + private final String className_; + private final String apiVersionString_; + private final HdfsUri location_; + private final boolean ifNotExists_; + private ApiVersion apiVersion_; + + public CreateDataSrcStmt(String dataSrcName, HdfsUri location, String className, + String apiVersionString, boolean ifNotExists) { + Preconditions.checkNotNull(dataSrcName); + Preconditions.checkNotNull(className); + Preconditions.checkNotNull(apiVersionString); + Preconditions.checkNotNull(location); + dataSrcName_ = dataSrcName.toLowerCase(); + location_ = location; + className_ = className; + apiVersionString_ = apiVersionString; + ifNotExists_ = ifNotExists; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!MetaStoreUtils.validateName(dataSrcName_)) { + throw new AnalysisException("Invalid data source name: " + dataSrcName_); + } + if (!ifNotExists_ && analyzer.getCatalog().getDataSource(dataSrcName_) != null) { + throw new AnalysisException(Analyzer.DATA_SRC_ALREADY_EXISTS_ERROR_MSG + + dataSrcName_); + } + + apiVersion_ = ApiVersion.parseApiVersion(apiVersionString_); + if (apiVersion_ == null) { + throw new AnalysisException("Invalid API version: '" + apiVersionString_ + + "'. Valid API versions: " + Joiner.on(", ").join(ApiVersion.values())); + } + + location_.analyze(analyzer, Privilege.ALL, FsAction.READ); + // TODO: Check class exists and implements API version + // TODO: authorization check + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE DATA SOURCE "); + if (ifNotExists_) sb.append("IF NOT EXISTS "); + sb.append(dataSrcName_); + sb.append(" LOCATION '"); + sb.append(location_.getLocation()); + sb.append("' CLASS '"); + sb.append(className_); + sb.append("' API_VERSION '"); + sb.append(apiVersion_.name()); + sb.append("'"); + return sb.toString(); + } + + public TCreateDataSourceParams toThrift() { + return new TCreateDataSourceParams( + new TDataSource(dataSrcName_, location_.toString(), className_, + apiVersion_.name())).setIf_not_exists(ifNotExists_); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java new file mode 100644 index 0000000..3dedd8b --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TCreateDbParams; + +/** + * Represents a CREATE DATABASE statement + */ +public class CreateDbStmt extends StatementBase { + private final String dbName_; + private final HdfsUri location_; + private final String comment_; + private final boolean ifNotExists_; + + /** + * Creates a database with the given name. + */ + public CreateDbStmt(String dbName) { + this(dbName, null, null, false); + } + + /** + * Creates a database with the given name, comment, and HDFS table storage location. + * New tables created in the database inherit the location property for their default + * storage location. Create database will throw an error if the database already exists + * unless the ifNotExists is true. + */ + public CreateDbStmt(String dbName, String comment, HdfsUri location, + boolean ifNotExists) { + this.dbName_ = dbName; + this.comment_ = comment; + this.location_ = location; + this.ifNotExists_ = ifNotExists; + } + + public String getComment() { return comment_; } + public String getDb() { return dbName_; } + public boolean getIfNotExists() { return ifNotExists_; } + public HdfsUri getLocation() { return location_; } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder("CREATE DATABASE"); + if (ifNotExists_) sb.append(" IF NOT EXISTS"); + sb.append(dbName_); + if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'"); + if (location_ != null) sb.append(" LOCATION '" + location_ + "'"); + return sb.toString(); + } + + public TCreateDbParams toThrift() { + TCreateDbParams params = new TCreateDbParams(); + params.setDb(getDb()); + params.setComment(getComment()); + params.setLocation(location_ == null ? null : location_.toString()); + params.setIf_not_exists(getIfNotExists()); + return params; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + // Check whether the db name meets the Metastore's requirements. + if (!MetaStoreUtils.validateName(dbName_)) { + throw new AnalysisException("Invalid database name: " + dbName_); + } + + // Note: It is possible that a database with the same name was created external to + // this Impala instance. If that happens, the caller will not get an + // AnalysisException when creating the database, they will get a Hive + // AlreadyExistsException once the request has been sent to the metastore. + Db db = analyzer.getDb(getDb(), Privilege.CREATE, false); + if (db != null && !ifNotExists_) { + throw new AnalysisException(Analyzer.DB_ALREADY_EXISTS_ERROR_MSG + getDb()); + } + + if (location_ != null) { + location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java new file mode 100644 index 0000000..ef90b8a --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import com.cloudera.impala.catalog.Role; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TCreateDropRoleParams; +import com.google.common.base.Preconditions; + +/** + * Represents a "CREATE ROLE" or "DROP ROLE" statement. + */ +public class CreateDropRoleStmt extends AuthorizationStmt { + private final String roleName_; + private final boolean isDropRole_; + + // Set in analysis + private String user_; + + public CreateDropRoleStmt(String roleName, boolean isDropRole) { + Preconditions.checkNotNull(roleName); + roleName_ = roleName; + isDropRole_ = isDropRole; + } + + @Override + public String toSql() { + return String.format("%s ROLE %s", roleName_, isDropRole_ ? "DROP" : "CREATE"); + } + + public TCreateDropRoleParams toThrift() { + TCreateDropRoleParams params = new TCreateDropRoleParams(); + params.setRole_name(roleName_); + params.setIs_drop(isDropRole_); + return params; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + Role existingRole = analyzer.getCatalog().getAuthPolicy().getRole(roleName_); + if (isDropRole_ && existingRole == null) { + throw new AnalysisException(String.format("Role '%s' does not exist.", roleName_)); + } else if (!isDropRole_ && existingRole != null) { + throw new AnalysisException(String.format("Role '%s' already exists.", roleName_)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java b/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java new file mode 100644 index 0000000..ebfd7b6 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.fs.permission.FsAction; + +import com.cloudera.impala.authorization.AuthorizeableFn; +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.authorization.PrivilegeRequest; +import com.cloudera.impala.catalog.Catalog; +import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.catalog.Function; +import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TCreateFunctionParams; +import com.cloudera.impala.thrift.TFunctionBinaryType; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Base class for CREATE [] FUNCTION. + */ +public abstract class CreateFunctionStmtBase extends StatementBase { + + // Enums for valid keys for optional arguments. + public enum OptArg { + COMMENT, + SYMBOL, // Only used for Udfs + PREPARE_FN, // Only used for Udfs + CLOSE_FN, // Only used for Udfs + UPDATE_FN, // Only used for Udas + INIT_FN, // Only used for Udas + SERIALIZE_FN, // Only used for Udas + MERGE_FN, // Only used for Udas + FINALIZE_FN // Only used for Udas + }; + + protected final FunctionName fnName_; + protected final FunctionArgs args_; + protected final TypeDef retTypeDef_; + protected final HdfsUri location_; + protected final HashMap<CreateFunctionStmtBase.OptArg, String> optArgs_; + protected final boolean ifNotExists_; + + // Result of analysis. + protected Function fn_; + + // Db object for function fn_. Set in analyze(). + protected Db db_; + + // Set in analyze() + protected String sqlString_; + + protected CreateFunctionStmtBase(FunctionName fnName, FunctionArgs args, + TypeDef retTypeDef, HdfsUri location, boolean ifNotExists, + HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) { + // The return and arg types must either be both null or non-null. + Preconditions.checkState(!(args == null ^ retTypeDef == null)); + fnName_ = fnName; + args_ = args; + retTypeDef_ = retTypeDef; + location_ = location; + ifNotExists_ = ifNotExists; + optArgs_ = optArgs; + } + + public String getComment() { return optArgs_.get(OptArg.COMMENT); } + public boolean getIfNotExists() { return ifNotExists_; } + public boolean hasSignature() { return args_ != null; } + + public TCreateFunctionParams toThrift() { + TCreateFunctionParams params = new TCreateFunctionParams(fn_.toThrift()); + params.setIf_not_exists(getIfNotExists()); + params.setFn(fn_.toThrift()); + return params; + } + + // Returns optArg[key], first validating that it is set. + protected String checkAndGetOptArg(OptArg key) + throws AnalysisException { + if (!optArgs_.containsKey(key)) { + throw new AnalysisException("Argument '" + key + "' must be set."); + } + return optArgs_.get(key); + } + + protected void checkOptArgNotSet(OptArg key) + throws AnalysisException { + if (optArgs_.containsKey(key)) { + throw new AnalysisException("Optional argument '" + key + "' should not be set."); + } + } + + // Returns the function's binary type based on the path extension. + private TFunctionBinaryType getBinaryType() throws AnalysisException { + TFunctionBinaryType binaryType = null; + String binaryPath = fn_.getLocation().getLocation(); + int suffixIndex = binaryPath.lastIndexOf("."); + if (suffixIndex != -1) { + String suffix = binaryPath.substring(suffixIndex + 1); + if (suffix.equalsIgnoreCase("jar")) { + binaryType = TFunctionBinaryType.JAVA; + } else if (suffix.equalsIgnoreCase("so")) { + binaryType = TFunctionBinaryType.NATIVE; + } else if (suffix.equalsIgnoreCase("ll")) { + binaryType = TFunctionBinaryType.IR; + } + } + if (binaryType == null) { + throw new AnalysisException("Unknown binary type: '" + binaryPath + + "'. Binary must end in .jar, .so or .ll"); + } + return binaryType; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + // Validate function name is legal + fnName_.analyze(analyzer); + + if (hasSignature()) { + // Validate function arguments and return type. + args_.analyze(analyzer); + retTypeDef_.analyze(analyzer); + fn_ = createFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(), + args_.hasVarArgs()); + } else { + fn_ = createFunction(fnName_, null, null, false); + } + + // For now, if authorization is enabled, the user needs ALL on the server + // to create functions. + // TODO: this is not the right granularity but acceptable for now. + analyzer.registerPrivReq(new PrivilegeRequest( + new AuthorizeableFn(fn_.signatureString()), Privilege.ALL)); + + Db builtinsDb = analyzer.getCatalog().getDb(Catalog.BUILTINS_DB); + if (builtinsDb.containsFunction(fn_.getName())) { + throw new AnalysisException("Function cannot have the same name as a builtin: " + + fn_.getFunctionName().getFunction()); + } + + db_ = analyzer.getDb(fn_.dbName(), Privilege.CREATE); + Function existingFn = db_.getFunction(fn_, Function.CompareMode.IS_INDISTINGUISHABLE); + if (existingFn != null && !ifNotExists_) { + throw new AnalysisException(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG + + existingFn.signatureString()); + } + + location_.analyze(analyzer, Privilege.CREATE, FsAction.READ); + fn_.setLocation(location_); + + // Check the file type from the binary type to infer the type of the UDA + fn_.setBinaryType(getBinaryType()); + + // Forbid unsupported and complex types. + if (hasSignature()) { + List<Type> refdTypes = Lists.newArrayList(fn_.getReturnType()); + refdTypes.addAll(Lists.newArrayList(fn_.getArgs())); + for (Type t: refdTypes) { + if (!t.isSupported() || t.isComplexType()) { + throw new AnalysisException( + String.format("Type '%s' is not supported in UDFs/UDAs.", t.toSql())); + } + } + } else if (fn_.getBinaryType() != TFunctionBinaryType.JAVA) { + throw new AnalysisException( + String.format("Native functions require a return type and/or " + + "argument types: %s", fn_.getFunctionName())); + } + + // Check if the function can be persisted. We persist all native/IR functions + // and also JAVA functions added without signature. Only JAVA functions added + // with signatures aren't persisted. + if (getBinaryType() == TFunctionBinaryType.JAVA && hasSignature()) { + fn_.setIsPersistent(false); + } else { + fn_.setIsPersistent(true); + } + } + + /** + * Creates a concrete function. + */ + protected abstract Function createFunction(FunctionName fnName, + ArrayList<Type> argTypes, Type retType, boolean hasVarArgs); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java new file mode 100644 index 0000000..cc04b04 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TCreateOrAlterViewParams; +import com.cloudera.impala.thrift.TTableName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Base class for CREATE VIEW and ALTER VIEW AS SELECT statements. + */ +public abstract class CreateOrAlterViewStmtBase extends StatementBase { + private final static Logger LOG = + LoggerFactory.getLogger(CreateOrAlterViewStmtBase.class); + + protected final boolean ifNotExists_; + protected final TableName tableName_; + protected final ArrayList<ColumnDef> columnDefs_; + protected final String comment_; + protected final QueryStmt viewDefStmt_; + + // Set during analysis + protected String dbName_; + protected String owner_; + + // The original SQL-string given as view definition. Set during analysis. + // Corresponds to Hive's viewOriginalText. + protected String originalViewDef_; + + // Query statement (as SQL string) that defines the View for view substitution. + // It is a transformation of the original view definition, e.g., to enforce the + // columnDefs even if the original view definition has explicit column aliases. + // If column definitions were given, then this "expanded" view definition + // wraps the original view definition in a select stmt as follows. + // + // SELECT viewName.origCol1 AS colDesc1, viewName.origCol2 AS colDesc2, ... + // FROM (originalViewDef) AS viewName + // + // Corresponds to Hive's viewExpandedText, but is not identical to the SQL + // Hive would produce in view creation. + protected String inlineViewDef_; + + // Columns to use in the select list of the expanded SQL string and when registering + // this view in the metastore. Set in analysis. + protected ArrayList<ColumnDef> finalColDefs_; + + public CreateOrAlterViewStmtBase(boolean ifNotExists, TableName tableName, + ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) { + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(viewDefStmt); + this.ifNotExists_ = ifNotExists; + this.tableName_ = tableName; + this.columnDefs_ = columnDefs; + this.comment_ = comment; + this.viewDefStmt_ = viewDefStmt; + } + + /** + * Sets the originalViewDef and the expanded inlineViewDef based on viewDefStmt. + * If columnDefs were given, checks that they do not contain duplicate column names + * and throws an exception if they do. + */ + protected void createColumnAndViewDefs(Analyzer analyzer) throws AnalysisException { + Preconditions.checkNotNull(dbName_); + Preconditions.checkNotNull(owner_); + + // Set the finalColDefs to reflect the given column definitions. + if (columnDefs_ != null) { + Preconditions.checkState(!columnDefs_.isEmpty()); + if (columnDefs_.size() != viewDefStmt_.getColLabels().size()) { + String cmp = + (columnDefs_.size() > viewDefStmt_.getColLabels().size()) ? "more" : "fewer"; + throw new AnalysisException(String.format("Column-definition list has " + + "%s columns (%s) than the view-definition query statement returns (%s).", + cmp, columnDefs_.size(), viewDefStmt_.getColLabels().size())); + } + + finalColDefs_ = columnDefs_; + Preconditions.checkState( + columnDefs_.size() == viewDefStmt_.getBaseTblResultExprs().size()); + for (int i = 0; i < columnDefs_.size(); ++i) { + // Set type in the column definition from the view-definition statement. + columnDefs_.get(i).setType(viewDefStmt_.getBaseTblResultExprs().get(i).getType()); + } + } else { + // Create list of column definitions from the view-definition statement. + finalColDefs_ = Lists.newArrayList(); + List<Expr> exprs = viewDefStmt_.getBaseTblResultExprs(); + List<String> labels = viewDefStmt_.getColLabels(); + Preconditions.checkState(exprs.size() == labels.size()); + for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) { + ColumnDef colDef = new ColumnDef(labels.get(i), null, null); + colDef.setType(exprs.get(i).getType()); + finalColDefs_.add(colDef); + } + } + + // Check that the column definitions have valid names, and that there are no + // duplicate column names. + Set<String> distinctColNames = Sets.newHashSet(); + for (ColumnDef colDesc: finalColDefs_) { + colDesc.analyze(); + if (!distinctColNames.add(colDesc.getColName().toLowerCase())) { + throw new AnalysisException("Duplicate column name: " + colDesc.getColName()); + } + } + + // Set original and expanded view-definition SQL strings. + originalViewDef_ = viewDefStmt_.toSql(); + + // If no column definitions were given, then the expanded view SQL is the same + // as the original one. + if (columnDefs_ == null) { + inlineViewDef_ = originalViewDef_; + return; + } + + // Wrap the original view-definition statement into a SELECT to enforce the + // given column definitions. + StringBuilder sb = new StringBuilder(); + sb.append("SELECT "); + for (int i = 0; i < finalColDefs_.size(); ++i) { + String colRef = ToSqlUtils.getIdentSql(viewDefStmt_.getColLabels().get(i)); + String colAlias = ToSqlUtils.getIdentSql(finalColDefs_.get(i).getColName()); + sb.append(String.format("%s.%s AS %s", tableName_.getTbl(), colRef, colAlias)); + sb.append((i+1 != finalColDefs_.size()) ? ", " : ""); + } + // Do not use 'AS' for table aliases because Hive only accepts them without 'AS'. + sb.append(String.format(" FROM (%s) %s", originalViewDef_, tableName_.getTbl())); + inlineViewDef_ = sb.toString(); + } + + /** + * Computes the column lineage graph for a create/alter view statetement. + */ + protected void computeLineageGraph(Analyzer analyzer) { + ColumnLineageGraph graph = analyzer.getColumnLineageGraph(); + List<String> colDefs = Lists.newArrayList(); + for (ColumnDef colDef: finalColDefs_) { + colDefs.add(dbName_ + "." + getTbl() + "." + colDef.getColName()); + } + graph.addTargetColumnLabels(colDefs); + graph.computeLineageGraph(viewDefStmt_.getResultExprs(), analyzer); + LOG.trace("lineage: " + graph.debugString()); + } + + public TCreateOrAlterViewParams toThrift() { + TCreateOrAlterViewParams params = new TCreateOrAlterViewParams(); + params.setView_name(new TTableName(getDb(), getTbl())); + for (ColumnDef col: finalColDefs_) { + params.addToColumns(col.toThrift()); + } + params.setOwner(getOwner()); + params.setIf_not_exists(getIfNotExists()); + params.setOriginal_view_def(originalViewDef_); + params.setExpanded_view_def(inlineViewDef_); + if (comment_ != null) params.setComment(comment_); + return params; + } + + /** + * Can only be called after analysis, returns the name of the database the table will + * be created within. + */ + public String getDb() { + Preconditions.checkNotNull(dbName_); + return dbName_; + } + + /** + * Can only be called after analysis, returns the owner of the view to be created. + */ + public String getOwner() { + Preconditions.checkNotNull(owner_); + return owner_; + } + + public List<ColumnDef> getColumnDescs() {return columnDefs_; } + public String getComment() { return comment_; } + public boolean getIfNotExists() { return ifNotExists_; } + public String getOriginalViewDef() { return originalViewDef_; } + public String getInlineViewDef() { return inlineViewDef_; } + public String getTbl() { return tableName_.getTbl(); } +}
