http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java deleted file mode 100644 index a00bf53..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java +++ /dev/null @@ -1,680 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; -import com.cloudera.impala.thrift.TEdgeType; -import com.cloudera.impala.thrift.TQueryCtx; -import com.cloudera.impala.thrift.TLineageGraph; -import com.cloudera.impala.thrift.TMultiEdge; -import com.cloudera.impala.thrift.TVertex; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - -/** - * Represents a vertex in the column lineage graph. A Vertex may correspond to a base - * table column, a column in the destination table (for the case of INSERT or CTAS - * queries) or a result expr (labeled column of a query result set). - */ -final class Vertex implements Comparable<Vertex> { - // Unique identifier of this vertex. - private final VertexId id_; - - private final String type_ = "COLUMN"; - - // A fully-qualified column name or the label of a result expr - private final String label_; - - public Vertex(VertexId id, String label) { - Preconditions.checkNotNull(id); - Preconditions.checkNotNull(label); - id_ = id; - label_ = label; - } - public VertexId getVertexId() { return id_; } - public String getLabel() { return label_; } - public String getType() { return type_; } - - @Override - public String toString() { return "(" + id_ + ":" + type_ + ":" + label_ + ")"; } - - /** - * Encodes this Vertex object into a JSON object represented by a Map. - */ - public Map toJson() { - // Use a LinkedHashMap to generate a strict ordering of elements. - Map obj = new LinkedHashMap(); - obj.put("id", id_.asInt()); - obj.put("vertexType", type_); - obj.put("vertexId", label_); - return obj; - } - - /** - * Constructs a Vertex object from a JSON object. The new object is returned. - */ - public static Vertex fromJsonObj(JSONObject obj) { - int id = ((Long) obj.get("id")).intValue(); - String label = (String) obj.get("vertexId"); - return new Vertex(new VertexId(id), label); - } - - /** - * Encodes this Vertex object into a thrift object - */ - public TVertex toThrift() { - return new TVertex(id_.asInt(), label_); - } - - /** - * Constructs a Vertex object from a thrift object. - */ - public static Vertex fromThrift(TVertex vertex) { - int id = ((Long) vertex.id).intValue(); - return new Vertex(new VertexId(id), vertex.label); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) return false; - if (obj.getClass() != this.getClass()) return false; - Vertex vertex = (Vertex) obj; - return this.id_.equals(vertex.id_) && - this.label_.equals(vertex.label_); - } - - public int compareTo(Vertex cmp) { return this.id_.compareTo(cmp.id_); } - - @Override - public int hashCode() { return id_.hashCode(); } -} - -/** - * Represents the unique identifier of a Vertex. - */ -class VertexId extends Id<VertexId> { - protected VertexId(int id) { - super(id); - } - public static IdGenerator<VertexId> createGenerator() { - return new IdGenerator<VertexId>() { - @Override - public VertexId getNextId() { return new VertexId(nextId_++); } - @Override - public VertexId getMaxId() { return new VertexId(nextId_ - 1); } - }; - } -} - -/** - * Represents a set of uni-directional edges in the column lineage graph, one edge from - * every source Vertex in 'sources_' to every target Vertex in 'targets_'. An edge - * indicates a dependency between a source and a target Vertex. There are two types of - * edges, PROJECTION and PREDICATE, that are described in the ColumnLineageGraph class. - */ -final class MultiEdge { - public static enum EdgeType { - PROJECTION, PREDICATE - } - private final Set<Vertex> sources_; - private final Set<Vertex> targets_; - private final EdgeType edgeType_; - - public MultiEdge(Set<Vertex> sources, Set<Vertex> targets, EdgeType type) { - sources_ = sources; - targets_ = targets; - edgeType_ = type; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - Joiner joiner = Joiner.on(","); - builder.append("Sources: ["); - builder.append(joiner.join(sources_) + "]\n"); - builder.append("Targets: ["); - builder.append(joiner.join(targets_) + "]\n"); - builder.append("Type: " + edgeType_); - return builder.toString(); - } - - /** - * Encodes this MultiEdge object to a JSON object represented by a Map. - */ - public Map toJson() { - Map obj = new LinkedHashMap(); - // Add sources - JSONArray sourceIds = new JSONArray(); - for (Vertex vertex: sources_) { - sourceIds.add(vertex.getVertexId()); - } - obj.put("sources", sourceIds); - // Add targets - JSONArray targetIds = new JSONArray(); - for (Vertex vertex: targets_) { - targetIds.add(vertex.getVertexId()); - } - obj.put("targets", targetIds); - obj.put("edgeType", edgeType_.toString()); - return obj; - } - - /** - * Encodes this MultiEdge object to a thrift object - */ - public TMultiEdge toThrift() { - List<TVertex> sources = Lists.newArrayList(); - for (Vertex vertex: sources_) { - sources.add(vertex.toThrift()); - } - List<TVertex> targets = Lists.newArrayList(); - for (Vertex vertex: targets_) { - targets.add(vertex.toThrift()); - } - if (edgeType_ == EdgeType.PROJECTION) { - return new TMultiEdge(sources, targets, TEdgeType.PROJECTION); - } - return new TMultiEdge(sources, targets, TEdgeType.PREDICATE); - } - - /** - * Constructs a MultiEdge object from a thrift object - */ - public static MultiEdge fromThrift(TMultiEdge obj){ - Set<Vertex> sources = Sets.newHashSet(); - for (TVertex vertex: obj.sources) { - sources.add(Vertex.fromThrift(vertex)); - } - Set<Vertex> targets = Sets.newHashSet(); - for (TVertex vertex: obj.targets) { - targets.add(Vertex.fromThrift(vertex)); - } - if (obj.edgetype == TEdgeType.PROJECTION) { - return new MultiEdge(sources, targets, EdgeType.PROJECTION); - } - return new MultiEdge(sources, targets, EdgeType.PREDICATE); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) return false; - if (obj.getClass() != this.getClass()) return false; - MultiEdge edge = (MultiEdge) obj; - return edge.sources_.equals(this.sources_) && - edge.targets_.equals(this.targets_) && - edge.edgeType_ == this.edgeType_; - } -} - -/** - * Represents the column lineage graph of a query. This is a directional graph that is - * used to track dependencies among the table/column entities that participate in - * a query. There are two types of dependencies that are represented as edges in the - * column lineage graph: - * a) Projection dependency: This is a dependency between a set of source - * columns (base table columns) and a single target (result expr or table column). - * This dependency indicates that values of the target depend on the values of the source - * columns. - * b) Predicate dependency: This is a dependency between a set of target - * columns (or exprs) and a set of source columns (base table columns). It indicates that - * the source columns restrict the values of their targets (e.g. by participating in - * WHERE clause predicates). - * - * The following dependencies are generated for a query: - * - Exactly one projection dependency for every result expr / target column. - * - Exactly one predicate dependency that targets all result exprs / target cols and - * depends on all columns participating in a conjunct in the query. - * - Special case of analytic fns: One predicate dependency per result expr / target col - * whose value is directly or indirectly affected by an analytic function with a - * partition by and/or order by clause. - */ -public class ColumnLineageGraph { - private final static Logger LOG = LoggerFactory.getLogger(ColumnLineageGraph.class); - // Query statement - private String queryStr_; - - // Name of the user that issued this query - private String user_; - - private final List<Expr> resultDependencyPredicates_ = Lists.newArrayList(); - - private final List<MultiEdge> edges_ = Lists.newArrayList(); - - // Timestamp in seconds since epoch (GMT) this query was submitted for execution. - private long timestamp_; - - // Map of Vertex labels to Vertex objects. - private final Map<String, Vertex> vertices_ = Maps.newHashMap(); - - // Map of Vertex ids to Vertex objects. Used primarily during the construction of the - // ColumnLineageGraph from a serialized JSON object. - private final Map<VertexId, Vertex> idToVertexMap_ = Maps.newHashMap(); - - // For an INSERT or a CTAS, these are the columns of the - // destination table plus any partitioning columns (when dynamic partitioning is used). - // For a SELECT stmt, they are the labels of the result exprs. - private final List<String> targetColumnLabels_ = Lists.newArrayList(); - - // Repository for tuple and slot descriptors for this query. Use it to construct the - // column lineage graph. - private DescriptorTable descTbl_; - - private final IdGenerator<VertexId> vertexIdGenerator = VertexId.createGenerator(); - - public ColumnLineageGraph() { } - - /** - * Private c'tor, used only for testing. - */ - private ColumnLineageGraph(String stmt, String user, long timestamp) { - queryStr_ = stmt; - user_ = user; - timestamp_ = timestamp; - } - - private void setVertices(Set<Vertex> vertices) { - for (Vertex vertex: vertices) { - vertices_.put(vertex.getLabel(), vertex); - idToVertexMap_.put(vertex.getVertexId(), vertex); - } - } - - /** - * Creates a new MultiEdge in the column lineage graph from the sets of 'sources' and - * 'targets' labels (representing column names or result expr labels). The new - * MultiEdge object is returned. - */ - private MultiEdge createMultiEdge(Set<String> targets, Set<String> sources, - MultiEdge.EdgeType type) { - Set<Vertex> targetVertices = Sets.newHashSet(); - for (String target: targets) { - targetVertices.add(createVertex(target)); - } - Set<Vertex> sourceVertices = Sets.newHashSet(); - for (String source: sources) { - sourceVertices.add(createVertex(source)); - } - MultiEdge edge = new MultiEdge(sourceVertices, targetVertices, type); - edges_.add(edge); - return edge; - } - - /** - * Creates a new vertex in the column lineage graph. The new Vertex object is - * returned. If a Vertex with the same label already exists, reuse it. - */ - private Vertex createVertex(String label) { - Vertex newVertex = vertices_.get(label); - if (newVertex != null) return newVertex; - newVertex = new Vertex(vertexIdGenerator.getNextId(), label); - vertices_.put(newVertex.getLabel(), newVertex); - idToVertexMap_.put(newVertex.getVertexId(), newVertex); - return newVertex; - } - - /** - * Computes the column lineage graph of a query from the list of query result exprs. - * 'rootAnalyzer' is the Analyzer that was used for the analysis of the query. - */ - public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) { - init(rootAnalyzer); - computeProjectionDependencies(resultExprs); - computeResultPredicateDependencies(rootAnalyzer); - } - - /** - * Initialize the ColumnLineageGraph from the root analyzer of a query. - */ - private void init(Analyzer analyzer) { - Preconditions.checkNotNull(analyzer); - Preconditions.checkState(analyzer.isRootAnalyzer()); - TQueryCtx queryCtx = analyzer.getQueryCtx(); - if (queryCtx.request.isSetRedacted_stmt()) { - queryStr_ = queryCtx.request.redacted_stmt; - } else { - queryStr_ = queryCtx.request.stmt; - } - Preconditions.checkNotNull(queryStr_); - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - try { - timestamp_ = df.parse(queryCtx.now_string).getTime() / 1000; - } catch (java.text.ParseException e) { - LOG.error("Error parsing timestamp value: " + queryCtx.now_string + - " " + e.getMessage()); - timestamp_ = new Date().getTime() / 1000; - } - descTbl_ = analyzer.getDescTbl(); - user_ = analyzer.getUser().getName(); - } - - private void computeProjectionDependencies(List<Expr> resultExprs) { - Preconditions.checkNotNull(resultExprs); - Preconditions.checkState(!resultExprs.isEmpty()); - Preconditions.checkState(resultExprs.size() == targetColumnLabels_.size()); - for (int i = 0; i < resultExprs.size(); ++i) { - Expr expr = resultExprs.get(i); - Set<String> sourceBaseCols = Sets.newHashSet(); - List<Expr> dependentExprs = Lists.newArrayList(); - getSourceBaseCols(expr, sourceBaseCols, dependentExprs, false); - Set<String> targets = Sets.newHashSet(targetColumnLabels_.get(i)); - createMultiEdge(targets, sourceBaseCols, MultiEdge.EdgeType.PROJECTION); - if (!dependentExprs.isEmpty()) { - // We have additional exprs that 'expr' has a predicate dependency on. - // Gather the transitive predicate dependencies of 'expr' based on its direct - // predicate dependencies. For each direct predicate dependency p, 'expr' is - // transitively predicate dependent on all exprs that p is projection and - // predicate dependent on. - Set<String> predicateBaseCols = Sets.newHashSet(); - for (Expr dependentExpr: dependentExprs) { - getSourceBaseCols(dependentExpr, predicateBaseCols, null, true); - } - createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE); - } - } - } - - /** - * Compute predicate dependencies for the query result, i.e. exprs that affect the - * possible values of the result exprs / target columns, such as predicates in a WHERE - * clause. - */ - private void computeResultPredicateDependencies(Analyzer analyzer) { - List<Expr> conjuncts = analyzer.getConjuncts(); - for (Expr expr: conjuncts) { - if (expr.isAuxExpr()) continue; - resultDependencyPredicates_.add(expr); - } - Set<String> predicateBaseCols = Sets.newHashSet(); - for (Expr expr: resultDependencyPredicates_) { - getSourceBaseCols(expr, predicateBaseCols, null, true); - } - if (predicateBaseCols.isEmpty()) return; - Set<String> targets = Sets.newHashSet(targetColumnLabels_); - createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE); - } - - /** - * Identify the base table columns that 'expr' is connected to by recursively resolving - * all associated slots through inline views and materialization points to base-table - * slots. If 'directPredDeps' is not null, it is populated with the exprs that - * have a predicate dependency with 'expr' (e.g. partitioning and order by exprs for - * the case of an analytic function). If 'traversePredDeps' is false, not all the - * children exprs of 'expr' are used to identify the base columns that 'expr' is - * connected to. Which children are filtered depends on the type of 'expr' (e.g. for - * AnalyticFunctionExpr, grouping and sorting exprs are filtered out). - */ - private void getSourceBaseCols(Expr expr, Set<String> sourceBaseCols, - List<Expr> directPredDeps, boolean traversePredDeps) { - List<Expr> exprsToTraverse = getProjectionDeps(expr); - List<Expr> predicateDepExprs = getPredicateDeps(expr); - if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs); - if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs); - List<SlotId> slotIds = Lists.newArrayList(); - for (Expr e: exprsToTraverse) { - e.getIds(null, slotIds); - } - for (SlotId slotId: slotIds) { - SlotDescriptor slotDesc = descTbl_.getSlotDesc(slotId); - List<Expr> sourceExprs = slotDesc.getSourceExprs(); - if (sourceExprs.isEmpty() && slotDesc.isScanSlot() && - slotDesc.getPath().isRootedAtTuple()) { - // slot should correspond to a materialized tuple of a table - Preconditions.checkState(slotDesc.getParent().isMaterialized()); - List<String> path = slotDesc.getPath().getCanonicalPath(); - sourceBaseCols.add(Joiner.on(".").join(path)); - } else { - for (Expr sourceExpr: sourceExprs) { - getSourceBaseCols(sourceExpr, sourceBaseCols, directPredDeps, - traversePredDeps); - } - } - } - } - - /** - * Retrieve the exprs that 'e' is directly projection dependent on. - * TODO Handle conditional exprs (e.g. CASE, IF). - */ - private List<Expr> getProjectionDeps(Expr e) { - Preconditions.checkNotNull(e); - List<Expr> outputExprs = Lists.newArrayList(); - if (e instanceof AnalyticExpr) { - AnalyticExpr analytic = (AnalyticExpr) e; - outputExprs.addAll(analytic.getChildren().subList(0, - analytic.getFnCall().getParams().size())); - } else { - outputExprs.add(e); - } - return outputExprs; - } - - /** - * Retrieve the exprs that 'e' is directly predicate dependent on. - * TODO Handle conditional exprs (e.g. CASE, IF). - */ - private List<Expr> getPredicateDeps(Expr e) { - Preconditions.checkNotNull(e); - List<Expr> outputExprs = Lists.newArrayList(); - if (e instanceof AnalyticExpr) { - AnalyticExpr analyticExpr = (AnalyticExpr) e; - outputExprs.addAll(analyticExpr.getPartitionExprs()); - for (OrderByElement orderByElem: analyticExpr.getOrderByElements()) { - outputExprs.add(orderByElem.getExpr()); - } - } - return outputExprs; - } - - public void addDependencyPredicates(Collection<Expr> exprs) { - resultDependencyPredicates_.addAll(exprs); - } - - /** - * Encodes the ColumnLineageGraph object to JSON. - */ - public String toJson() { - if (Strings.isNullOrEmpty(queryStr_)) return ""; - Map obj = new LinkedHashMap(); - obj.put("queryText", queryStr_); - obj.put("hash", getQueryHash(queryStr_)); - obj.put("user", user_); - obj.put("timestamp", timestamp_); - // Add edges - JSONArray edges = new JSONArray(); - for (MultiEdge edge: edges_) { - edges.add(edge.toJson()); - } - obj.put("edges", edges); - // Add vertices - TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values()); - JSONArray vertices = new JSONArray(); - for (Vertex vertex: sortedVertices) { - vertices.add(vertex.toJson()); - } - obj.put("vertices", vertices); - return JSONValue.toJSONString(obj); - } - - /** - * Serializes the ColumnLineageGraph to a thrift object - */ - public TLineageGraph toThrift() { - TLineageGraph graph = new TLineageGraph(); - if (Strings.isNullOrEmpty(queryStr_)) return graph; - graph.setQuery_text(queryStr_); - graph.setHash(getQueryHash(queryStr_)); - graph.setUser(user_); - graph.setStarted(timestamp_); - // Add edges - List<TMultiEdge> edges = Lists.newArrayList(); - for (MultiEdge edge: edges_) { - edges.add(edge.toThrift()); - } - graph.setEdges(edges); - // Add vertices - TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values()); - List<TVertex> vertices = Lists.newArrayList(); - for (Vertex vertex: sortedVertices) { - vertices.add(vertex.toThrift()); - } - graph.setVertices(vertices); - return graph; - } - - /** - * Creates a LineageGraph object from a thrift object - */ - public static ColumnLineageGraph fromThrift(TLineageGraph obj) { - ColumnLineageGraph lineage = - new ColumnLineageGraph(obj.query_text, obj.user, obj.started); - TreeSet<Vertex> vertices = Sets.newTreeSet(); - for (TVertex vertex: obj.vertices) { - vertices.add(Vertex.fromThrift(vertex)); - } - lineage.setVertices(vertices); - for (TMultiEdge edge: obj.edges) { - MultiEdge e = MultiEdge.fromThrift(edge); - lineage.edges_.add(e); - } - return lineage; - } - - private String getQueryHash(String queryStr) { - Hasher hasher = Hashing.md5().newHasher(); - hasher.putString(queryStr); - return hasher.hash().toString(); - } - - /** - * Creates a ColumnLineageGraph object from a serialized JSON record. The new - * ColumnLineageGraph object is returned. Used only during testing. - */ - public static ColumnLineageGraph createFromJSON(String json) { - if (json == null || json.isEmpty()) return null; - JSONParser parser = new JSONParser(); - Object obj = null; - try { - obj = parser.parse(json); - } catch (ParseException e) { - LOG.error("Error parsing serialized column lineage graph: " + e.getMessage()); - return null; - } - if (!(obj instanceof JSONObject)) return null; - JSONObject jsonObj = (JSONObject) obj; - String stmt = (String) jsonObj.get("queryText"); - String hash = (String) jsonObj.get("hash"); - String user = (String) jsonObj.get("user"); - long timestamp = (Long) jsonObj.get("timestamp"); - ColumnLineageGraph graph = new ColumnLineageGraph(stmt, user, timestamp); - JSONArray serializedVertices = (JSONArray) jsonObj.get("vertices"); - Set<Vertex> vertices = Sets.newHashSet(); - for (int i = 0; i < serializedVertices.size(); ++i) { - Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i)); - vertices.add(v); - } - graph.setVertices(vertices); - JSONArray serializedEdges = (JSONArray) jsonObj.get("edges"); - for (int i = 0; i < serializedEdges.size(); ++i) { - MultiEdge e = - graph.createMultiEdgeFromJSONObj((JSONObject) serializedEdges.get(i)); - graph.edges_.add(e); - } - return graph; - } - - private MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge) { - Preconditions.checkNotNull(jsonEdge); - JSONArray sources = (JSONArray) jsonEdge.get("sources"); - Set<Vertex> sourceVertices = getVerticesFromJSONArray(sources); - JSONArray targets = (JSONArray) jsonEdge.get("targets"); - Set<Vertex> targetVertices = getVerticesFromJSONArray(targets); - MultiEdge.EdgeType type = - MultiEdge.EdgeType.valueOf((String) jsonEdge.get("edgeType")); - return new MultiEdge(sourceVertices, targetVertices, type); - } - - private Set<Vertex> getVerticesFromJSONArray(JSONArray vertexIdArray) { - Set<Vertex> vertices = Sets.newHashSet(); - for (int i = 0; i < vertexIdArray.size(); ++i) { - int sourceId = ((Long) vertexIdArray.get(i)).intValue(); - Vertex sourceVertex = idToVertexMap_.get(new VertexId(sourceId)); - Preconditions.checkNotNull(sourceVertex); - vertices.add(sourceVertex); - } - return vertices; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) return false; - if (obj.getClass() != this.getClass()) return false; - ColumnLineageGraph g = (ColumnLineageGraph) obj; - if (!this.vertices_.equals(g.vertices_) || - !this.edges_.equals(g.edges_)) { - return false; - } - return true; - } - - public String debugString() { - StringBuilder builder = new StringBuilder(); - for (MultiEdge edge: edges_) { - builder.append(edge.toString() + "\n"); - } - builder.append(toJson()); - return builder.toString(); - } - - public void addTargetColumnLabels(Collection<String> columnLabels) { - Preconditions.checkNotNull(columnLabels); - targetColumnLabels_.addAll(columnLabels); - } - - public void addTargetColumnLabels(Table dstTable) { - Preconditions.checkNotNull(dstTable); - String tblFullName = dstTable.getFullName(); - for (String columnName: dstTable.getColumnNames()) { - targetColumnLabels_.add(tblFullName + "." + columnName); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java b/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java deleted file mode 100644 index 4869004..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java +++ /dev/null @@ -1,216 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.util.ArrayList; -import java.util.List; - -import com.cloudera.impala.catalog.Db; -import com.cloudera.impala.catalog.Function.CompareMode; -import com.cloudera.impala.catalog.ScalarFunction; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.thrift.TExprNode; -import com.cloudera.impala.thrift.TExprNodeType; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * &&, ||, ! predicates. - * - */ -public class CompoundPredicate extends Predicate { - public enum Operator { - AND("AND"), - OR("OR"), - NOT("NOT"); - - private final String description; - - private Operator(String description) { - this.description = description; - } - - @Override - public String toString() { - return description; - } - } - private final Operator op_; - - public static void initBuiltins(Db db) { - // AND and OR are implemented as custom exprs, so they do not have a function symbol. - db.addBuiltin(ScalarFunction.createBuiltinOperator( - Operator.AND.name(), "", - Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN)); - db.addBuiltin(ScalarFunction.createBuiltinOperator( - Operator.OR.name(), "", - Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN)); - db.addBuiltin(ScalarFunction.createBuiltinOperator( - Operator.NOT.name(), "impala::CompoundPredicate::Not", - Lists.<Type>newArrayList(Type.BOOLEAN), Type.BOOLEAN)); - } - - public CompoundPredicate(Operator op, Expr e1, Expr e2) { - super(); - this.op_ = op; - Preconditions.checkNotNull(e1); - children_.add(e1); - Preconditions.checkArgument(op == Operator.NOT && e2 == null - || op != Operator.NOT && e2 != null); - if (e2 != null) children_.add(e2); - } - - /** - * Copy c'tor used in clone(). - */ - protected CompoundPredicate(CompoundPredicate other) { - super(other); - op_ = other.op_; - } - - public Operator getOp() { return op_; } - - @Override - public boolean equals(Object obj) { - if (!super.equals(obj)) return false; - return ((CompoundPredicate) obj).op_ == op_; - } - - @Override - public String debugString() { - return Objects.toStringHelper(this) - .add("op", op_) - .addValue(super.debugString()) - .toString(); - } - - @Override - public String toSqlImpl() { - if (children_.size() == 1) { - Preconditions.checkState(op_ == Operator.NOT); - return "NOT " + getChild(0).toSql(); - } else { - return getChild(0).toSql() + " " + op_.toString() + " " + getChild(1).toSql(); - } - } - - @Override - protected void toThrift(TExprNode msg) { - msg.node_type = TExprNodeType.COMPOUND_PRED; - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - if (isAnalyzed_) return; - super.analyze(analyzer); - - // Check that children are predicates. - for (Expr e: children_) { - if (!e.getType().isBoolean() && !e.getType().isNull()) { - throw new AnalysisException(String.format("Operand '%s' part of predicate " + - "'%s' should return type 'BOOLEAN' but returns type '%s'.", - e.toSql(), toSql(), e.getType().toSql())); - } - } - - fn_ = getBuiltinFunction(analyzer, op_.toString(), collectChildReturnTypes(), - CompareMode.IS_NONSTRICT_SUPERTYPE_OF); - Preconditions.checkState(fn_ != null); - Preconditions.checkState(fn_.getReturnType().isBoolean()); - castForFunctionCall(false); - if (hasChildCosts()) evalCost_ = getChildCosts() + COMPOUND_PREDICATE_COST; - - if (!getChild(0).hasSelectivity() || - (children_.size() == 2 && !getChild(1).hasSelectivity())) { - // Give up if one of our children has an unknown selectivity. - selectivity_ = -1; - return; - } - - switch (op_) { - case AND: - selectivity_ = getChild(0).selectivity_ * getChild(1).selectivity_; - break; - case OR: - selectivity_ = getChild(0).selectivity_ + getChild(1).selectivity_ - - getChild(0).selectivity_ * getChild(1).selectivity_; - break; - case NOT: - selectivity_ = 1.0 - getChild(0).selectivity_; - break; - } - selectivity_ = Math.max(0.0, Math.min(1.0, selectivity_)); - } - - /** - * Retrieve the slots bound by BinaryPredicate, InPredicate and - * CompoundPredicates in the subtree rooted at 'this'. - */ - public ArrayList<SlotRef> getBoundSlots() { - ArrayList<SlotRef> slots = Lists.newArrayList(); - for (int i = 0; i < getChildren().size(); ++i) { - if (getChild(i) instanceof BinaryPredicate || - getChild(i) instanceof InPredicate) { - slots.add(((Predicate)getChild(i)).getBoundSlot()); - } else if (getChild(i) instanceof CompoundPredicate) { - slots.addAll(((CompoundPredicate)getChild(i)).getBoundSlots()); - } - } - return slots; - } - - /** - * Negates a CompoundPredicate. - */ - @Override - public Expr negate() { - if (op_ == Operator.NOT) return getChild(0); - Expr negatedLeft = getChild(0).negate(); - Expr negatedRight = getChild(1).negate(); - Operator newOp = (op_ == Operator.OR) ? Operator.AND : Operator.OR; - return new CompoundPredicate(newOp, negatedLeft, negatedRight); - } - - /** - * Creates a conjunctive predicate from a list of exprs. - */ - public static Expr createConjunctivePredicate(List<Expr> conjuncts) { - Expr conjunctivePred = null; - for (Expr expr: conjuncts) { - if (conjunctivePred == null) { - conjunctivePred = expr; - continue; - } - conjunctivePred = new CompoundPredicate(CompoundPredicate.Operator.AND, - expr, conjunctivePred); - } - return conjunctivePred; - } - - @Override - public Expr clone() { return new CompoundPredicate(this); } - - // Create an AND predicate between two exprs, 'lhs' and 'rhs'. If - // 'rhs' is null, simply return 'lhs'. - public static Expr createConjunction(Expr lhs, Expr rhs) { - if (rhs == null) return lhs; - return new CompoundPredicate(Operator.AND, rhs, lhs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java deleted file mode 100644 index cd01713..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java +++ /dev/null @@ -1,553 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.log4j.Logger; - -import com.cloudera.impala.authorization.Privilege; -import com.cloudera.impala.catalog.Column; -import com.cloudera.impala.catalog.HBaseTable; -import com.cloudera.impala.catalog.HdfsPartition; -import com.cloudera.impala.catalog.HdfsTable; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.catalog.View; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.PrintUtils; -import com.cloudera.impala.thrift.TComputeStatsParams; -import com.cloudera.impala.thrift.TPartitionStats; -import com.cloudera.impala.thrift.TTableName; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Represents a COMPUTE STATS <table> and COMPUTE INCREMENTAL STATS <table> [PARTITION - * <part_spec>] statement for statistics collection. The former statement gathers all - * table and column stats for a given table and stores them in the Metastore via the - * CatalogService. All existing stats for that table are replaced and no existing stats - * are reused. The latter, incremental form, similarly computes stats for the whole table - * but does so by re-using stats from partitions which have 'valid' statistics. Statistics - * are 'valid' currently if they exist, in the future they may be expired based on recency - * etc. - * - * TODO: Allow more coarse/fine grained (db, column) - * TODO: Compute stats on complex types. - */ -public class ComputeStatsStmt extends StatementBase { - private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class); - - private static String AVRO_SCHEMA_MSG_PREFIX = "Cannot COMPUTE STATS on Avro table " + - "'%s' because its column definitions do not match those in the Avro schema."; - private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " + - "column definitions, e.g., using the result of 'SHOW CREATE TABLE'"; - - protected final TableName tableName_; - - // Set during analysis. - protected Table table_; - - // The Null count is not currently being used in optimization or run-time, - // and compute stats runs 2x faster in many cases when not counting NULLs. - private static final boolean COUNT_NULLS = false; - - // Query for getting the per-partition row count and the total row count. - // Set during analysis. - protected String tableStatsQueryStr_; - - // Query for getting the per-column NDVs and number of NULLs. - // Set during analysis. - protected String columnStatsQueryStr_; - - // If true, stats will be gathered incrementally per-partition. - private boolean isIncremental_ = false; - - // If true, expect the compute stats process to produce output for all partitions in the - // target table (only meaningful, therefore, if partitioned). This is always true for - // non-incremental computations. If set, expectedPartitions_ will be empty - the point - // of this flag is to optimise the case where all partitions are targeted. - private boolean expectAllPartitions_ = false; - - // The list of valid partition statistics that can be used in an incremental computation - // without themselves being recomputed. Populated in analyze(). - private final List<TPartitionStats> validPartStats_ = Lists.newArrayList(); - - // For incremental computations, the list of partitions (identified by list of partition - // column values) that we expect to receive results for. Used to ensure that even empty - // partitions emit results. - // TODO: Consider using partition IDs (and adding them to the child queries with a - // PARTITION_ID() builtin) - private final List<List<String>> expectedPartitions_ = Lists.newArrayList(); - - // If non-null, the partition that an incremental computation might apply to. Must be - // null if this is a non-incremental computation. - private PartitionSpec partitionSpec_ = null; - - // The maximum number of partitions that may be explicitly selected by filter - // predicates. Any query that selects more than this automatically drops back to a full - // incremental stats recomputation. - // TODO: We can probably do better than this, e.g. running several queries, each of - // which selects up to MAX_INCREMENTAL_PARTITIONS partitions. - private static final int MAX_INCREMENTAL_PARTITIONS = 1000; - - /** - * Constructor for the non-incremental form of COMPUTE STATS. - */ - protected ComputeStatsStmt(TableName tableName) { - this(tableName, false, null); - } - - /** - * Constructor for the incremental form of COMPUTE STATS. If isIncremental is true, - * statistics will be recomputed incrementally; if false they will be recomputed for the - * whole table. The partition spec partSpec can specify a single partition whose stats - * should be recomputed. - */ - protected ComputeStatsStmt(TableName tableName, boolean isIncremental, - PartitionSpec partSpec) { - Preconditions.checkState(tableName != null && !tableName.isEmpty()); - Preconditions.checkState(isIncremental || partSpec == null); - this.tableName_ = tableName; - this.table_ = null; - this.isIncremental_ = isIncremental; - this.partitionSpec_ = partSpec; - if (partitionSpec_ != null) { - partitionSpec_.setTableName(tableName); - partitionSpec_.setPrivilegeRequirement(Privilege.ALTER); - } - } - - /** - * Utility method for constructing the child queries to add partition columns to both a - * select list and a group-by list; the former are wrapped in a cast to a string. - */ - private void addPartitionCols(HdfsTable table, List<String> selectList, - List<String> groupByCols) { - for (int i = 0; i < table.getNumClusteringCols(); ++i) { - String colRefSql = ToSqlUtils.getIdentSql(table.getColumns().get(i).getName()); - groupByCols.add(colRefSql); - // For the select list, wrap the group by columns in a cast to string because - // the Metastore stores them as strings. - selectList.add(colRefSql); - } - } - - private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) { - List<String> columnStatsSelectList = Lists.newArrayList(); - // For Hdfs tables, exclude partition columns from stats gathering because Hive - // cannot store them as part of the non-partition column stats. For HBase tables, - // include the single clustering column (the row key). - int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols(); - final String ndvUda = isIncremental_ ? "NDV_NO_FINALIZE" : "NDV"; - - for (int i = startColIdx; i < table_.getColumns().size(); ++i) { - Column c = table_.getColumns().get(i); - Type type = c.getType(); - - // Ignore columns with an invalid/unsupported type. For example, complex types in - // an HBase-backed table will appear as invalid types. - if (!type.isValid() || !type.isSupported() - || c.getType().isComplexType()) { - continue; - } - // NDV approximation function. Add explicit alias for later identification when - // updating the Metastore. - String colRefSql = ToSqlUtils.getIdentSql(c.getName()); - columnStatsSelectList.add(ndvUda + "(" + colRefSql + ") AS " + colRefSql); - - if (COUNT_NULLS) { - // Count the number of NULL values. - columnStatsSelectList.add("COUNT(IF(" + colRefSql + " IS NULL, 1, NULL))"); - } else { - // Using -1 to indicate "unknown". We need cast to BIGINT because backend expects - // an i64Val as the number of NULLs returned by the COMPUTE STATS column stats - // child query. See CatalogOpExecutor::SetColumnStats(). If we do not cast, then - // the -1 will be treated as TINYINT resulting a 0 to be placed in the #NULLs - // column (see IMPALA-1068). - columnStatsSelectList.add("CAST(-1 as BIGINT)"); - } - - // For STRING columns also compute the max and avg string length. - if (type.isStringType()) { - columnStatsSelectList.add("MAX(length(" + colRefSql + "))"); - columnStatsSelectList.add("AVG(length(" + colRefSql + "))"); - } else { - // For non-STRING columns we use the fixed size of the type. - // We store the same information for all types to avoid having to - // treat STRING columns specially in the BE CatalogOpExecutor. - Integer typeSize = type.getPrimitiveType().getSlotSize(); - columnStatsSelectList.add(typeSize.toString()); - columnStatsSelectList.add("CAST(" + typeSize.toString() + " as DOUBLE)"); - } - - if (isIncremental_) { - // Need the count in order to properly combine per-partition column stats - columnStatsSelectList.add("COUNT(" + colRefSql + ")"); - } - } - return columnStatsSelectList; - } - - /** - * Constructs two queries to compute statistics for 'tableName_', if that table exists - * (although if we can detect that no work needs to be done for either query, that query - * will be 'null' and not executed). - * - * The first query computes the number of rows (on a per-partition basis if the table is - * partitioned) and has the form "SELECT COUNT(*) FROM tbl GROUP BY part_col1, - * part_col2...", with an optional WHERE clause for incremental computation (see below). - * - * The second query computes the NDV estimate, the average width, the maximum width and, - * optionally, the number of nulls for each column. For non-partitioned tables (or - * non-incremental computations), the query is simple: - * - * SELECT NDV(col), COUNT(<nulls>), MAX(length(col)), AVG(length(col)) FROM tbl - * - * (For non-string columns, the widths are hard-coded as they are known at query - * construction time). - * - * If computation is incremental (i.e. the original statement was COMPUTE INCREMENTAL - * STATS.., and the underlying table is a partitioned HdfsTable), some modifications are - * made to the non-incremental per-column query. First, a different UDA, - * NDV_NO_FINALIZE() is used to retrieve and serialise the intermediate state from each - * column. Second, the results are grouped by partition, as with the row count query, so - * that the intermediate NDV computation state can be stored per-partition. The number - * of rows per-partition are also recorded. - * - * For both the row count query, and the column stats query, the query's WHERE clause is - * used to restrict execution only to partitions that actually require new statistics to - * be computed. - * - * SELECT NDV_NO_FINALIZE(col), <nulls, max, avg>, COUNT(col) FROM tbl - * GROUP BY part_col1, part_col2, ... - * WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR - * ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ... - */ - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - table_ = analyzer.getTable(tableName_, Privilege.ALTER); - String sqlTableName = table_.getTableName().toSql(); - if (table_ instanceof View) { - throw new AnalysisException(String.format( - "COMPUTE STATS not supported for view %s", sqlTableName)); - } - - if (!(table_ instanceof HdfsTable)) { - if (partitionSpec_ != null) { - throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " + - "for non-HDFS table " + table_.getTableName()); - } - isIncremental_ = false; - } - - // Ensure that we write an entry for every partition if this isn't incremental - if (!isIncremental_) expectAllPartitions_ = true; - - HdfsTable hdfsTable = null; - if (table_ instanceof HdfsTable) { - hdfsTable = (HdfsTable)table_; - if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 && - partitionSpec_ != null) { - throw new AnalysisException(String.format( - "Can't compute PARTITION stats on an unpartitioned table: %s", - sqlTableName)); - } else if (partitionSpec_ != null) { - partitionSpec_.setPartitionShouldExist(); - partitionSpec_.analyze(analyzer); - for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) { - // TODO: We could match the dynamic keys (i.e. as wildcards) as well, but that - // would involve looping over all partitions and seeing which match the - // partition spec. - if (!kv.isStatic()) { - throw new AnalysisException("All partition keys must have values: " + - kv.toString()); - } - } - } - // For incremental stats, estimate the size of intermediate stats and report an - // error if the estimate is greater than MAX_INCREMENTAL_STATS_SIZE_BYTES. - if (isIncremental_) { - long statsSizeEstimate = hdfsTable.getColumns().size() * - hdfsTable.getPartitions().size() * HdfsTable.STATS_SIZE_PER_COLUMN_BYTES; - if (statsSizeEstimate > HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES) { - LOG.error("Incremental stats size estimate for table " + hdfsTable.getName() + - " exceeded " + HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES + ", estimate = " - + statsSizeEstimate); - throw new AnalysisException("Incremental stats size estimate exceeds " - + PrintUtils.printBytes(HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES) - + ". Please try COMPUTE STATS instead."); - } - } - } - - // Build partition filters that only select partitions without valid statistics for - // incremental computation. - List<String> filterPreds = Lists.newArrayList(); - if (isIncremental_) { - if (partitionSpec_ == null) { - // If any column does not have stats, we recompute statistics for all partitions - // TODO: need a better way to invalidate stats for all partitions, so that we can - // use this logic to only recompute new / changed columns. - boolean tableIsMissingColStats = false; - - // We'll warn the user if a column is missing stats (and therefore we rescan the - // whole table), but if all columns are missing stats, the table just doesn't have - // any stats and there's no need to warn. - boolean allColumnsMissingStats = true; - String exampleColumnMissingStats = null; - // Partition columns always have stats, so exclude them from this search - for (Column col: table_.getNonClusteringColumns()) { - if (!col.getStats().hasStats()) { - if (!tableIsMissingColStats) { - tableIsMissingColStats = true; - exampleColumnMissingStats = col.getName(); - } - } else { - allColumnsMissingStats = false; - } - } - - if (tableIsMissingColStats && !allColumnsMissingStats) { - analyzer.addWarning("Column " + exampleColumnMissingStats + - " does not have statistics, recomputing stats for the whole table"); - } - - for (HdfsPartition p: hdfsTable.getPartitions()) { - if (p.isDefaultPartition()) continue; - TPartitionStats partStats = p.getPartitionStats(); - if (!p.hasIncrementalStats() || tableIsMissingColStats) { - if (partStats == null) LOG.trace(p.toString() + " does not have stats"); - if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql()); - List<String> partValues = Lists.newArrayList(); - for (LiteralExpr partValue: p.getPartitionValues()) { - partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue, - "NULL")); - } - expectedPartitions_.add(partValues); - } else { - LOG.trace(p.toString() + " does have statistics"); - validPartStats_.add(partStats); - } - } - if (expectedPartitions_.size() == hdfsTable.getPartitions().size() - 1) { - expectedPartitions_.clear(); - expectAllPartitions_ = true; - } - } else { - // Always compute stats on a particular partition when told to. - List<String> partitionConjuncts = Lists.newArrayList(); - for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) { - partitionConjuncts.add(kv.toPredicateSql()); - } - filterPreds.add("(" + Joiner.on(" AND ").join(partitionConjuncts) + ")"); - HdfsPartition targetPartition = - hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues()); - List<String> partValues = Lists.newArrayList(); - for (LiteralExpr partValue: targetPartition.getPartitionValues()) { - partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue, - "NULL")); - } - expectedPartitions_.add(partValues); - for (HdfsPartition p: hdfsTable.getPartitions()) { - if (p.isDefaultPartition()) continue; - if (p == targetPartition) continue; - TPartitionStats partStats = p.getPartitionStats(); - if (partStats != null) validPartStats_.add(partStats); - } - } - - if (filterPreds.size() == 0 && validPartStats_.size() != 0) { - LOG.info("No partitions selected for incremental stats update"); - analyzer.addWarning("No partitions selected for incremental stats update"); - return; - } - } - - if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) { - // TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS partitions, and then - // advising the user to iterate. - analyzer.addWarning( - "Too many partitions selected, doing full recomputation of incremental stats"); - filterPreds.clear(); - validPartStats_.clear(); - } - - List<String> groupByCols = Lists.newArrayList(); - List<String> partitionColsSelectList = Lists.newArrayList(); - // Only add group by clause for HdfsTables. - if (hdfsTable != null) { - if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable); - addPartitionCols(hdfsTable, partitionColsSelectList, groupByCols); - } - - // Query for getting the per-partition row count and the total row count. - StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT "); - List<String> tableStatsSelectList = Lists.newArrayList(); - tableStatsSelectList.add("COUNT(*)"); - - tableStatsSelectList.addAll(partitionColsSelectList); - tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList)); - tableStatsQueryBuilder.append(" FROM " + sqlTableName); - - // Query for getting the per-column NDVs and number of NULLs. - List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer); - - if (isIncremental_) columnStatsSelectList.addAll(partitionColsSelectList); - - StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT "); - columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList)); - columnStatsQueryBuilder.append(" FROM " + sqlTableName); - - // Add the WHERE clause to filter out partitions that we don't want to compute - // incremental stats for. While this is a win in most situations, we would like to - // avoid this where it does no useful work (i.e. it selects all rows). This happens - // when there are no existing valid partitions (so all partitions will have been - // selected in) and there is no partition spec (so no single partition was explicitly - // selected in). - if (filterPreds.size() > 0 && - (validPartStats_.size() > 0 || partitionSpec_ != null)) { - String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds); - columnStatsQueryBuilder.append(filterClause); - tableStatsQueryBuilder.append(filterClause); - } - - if (groupByCols.size() > 0) { - String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols); - if (isIncremental_) columnStatsQueryBuilder.append(groupBy); - tableStatsQueryBuilder.append(groupBy); - } - - tableStatsQueryStr_ = tableStatsQueryBuilder.toString(); - LOG.debug("Table stats query: " + tableStatsQueryStr_); - - if (columnStatsSelectList.isEmpty()) { - // Table doesn't have any columns that we can compute stats for. - LOG.info("No supported column types in table " + table_.getTableName() + - ", no column statistics will be gathered."); - columnStatsQueryStr_ = null; - return; - } - - columnStatsQueryStr_ = columnStatsQueryBuilder.toString(); - LOG.debug("Column stats query: " + columnStatsQueryStr_); - } - - /** - * Checks whether the column definitions from the CREATE TABLE stmt match the columns - * in the Avro schema. If there is a mismatch, then COMPUTE STATS cannot update the - * statistics in the Metastore's backend DB due to HIVE-6308. Throws an - * AnalysisException for such ill-created Avro tables. Does nothing if - * the column definitions match the Avro schema exactly. - */ - private void checkIncompleteAvroSchema(HdfsTable table) throws AnalysisException { - Preconditions.checkState(table.isAvroTable()); - org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable(); - // The column definitions from 'CREATE TABLE (column definitions) ...' - Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator(); - // The columns derived from the Avro schema file or literal schema. - // Inconsistencies between the Avro-schema columns and the column definitions - // are sometimes resolved in the CREATE TABLE, and sometimes not (see below). - Iterator<Column> avroSchemaCols = table.getColumns().iterator(); - // Skip partition columns from 'table' since those are not present in - // the msTable field schemas. - for (int i = 0; i < table.getNumClusteringCols(); ++i) { - if (avroSchemaCols.hasNext()) avroSchemaCols.next(); - } - int pos = 0; - while (colDefs.hasNext() || avroSchemaCols.hasNext()) { - if (colDefs.hasNext() && avroSchemaCols.hasNext()) { - FieldSchema colDef = colDefs.next(); - Column avroSchemaCol = avroSchemaCols.next(); - // Check that the column names are identical. Ignore mismatched types - // as those will either fail in the scan or succeed. - if (!colDef.getName().equalsIgnoreCase(avroSchemaCol.getName())) { - throw new AnalysisException( - String.format(AVRO_SCHEMA_MSG_PREFIX + - "\nDefinition of column '%s' of type '%s' does not match " + - "the Avro-schema column '%s' of type '%s' at position '%s'.\n" + - AVRO_SCHEMA_MSG_SUFFIX, - table.getName(), colDef.getName(), colDef.getType(), - avroSchemaCol.getName(), avroSchemaCol.getType(), pos)); - } - } - // The following two cases are typically not possible because Hive resolves - // inconsistencies between the column-definition list and the Avro schema if a - // column-definition list was given in the CREATE TABLE (having no column - // definitions at all results in HIVE-6308). Even so, we check these cases for - // extra safety. COMPUTE STATS could be made to succeed in special instances of - // the cases below but we chose to throw an AnalysisException to avoid confusion - // because this scenario "should" never arise as mentioned above. - if (colDefs.hasNext() && !avroSchemaCols.hasNext()) { - FieldSchema colDef = colDefs.next(); - throw new AnalysisException( - String.format(AVRO_SCHEMA_MSG_PREFIX + - "\nMissing Avro-schema column corresponding to column " + - "definition '%s' of type '%s' at position '%s'.\n" + - AVRO_SCHEMA_MSG_SUFFIX, - table.getName(), colDef.getName(), colDef.getType(), pos)); - } - if (!colDefs.hasNext() && avroSchemaCols.hasNext()) { - Column avroSchemaCol = avroSchemaCols.next(); - throw new AnalysisException( - String.format(AVRO_SCHEMA_MSG_PREFIX + - "\nMissing column definition corresponding to Avro-schema " + - "column '%s' of type '%s' at position '%s'.\n" + - AVRO_SCHEMA_MSG_SUFFIX, - table.getName(), avroSchemaCol.getName(), avroSchemaCol.getType(), pos)); - } - ++pos; - } - } - - public String getTblStatsQuery() { return tableStatsQueryStr_; } - public String getColStatsQuery() { return columnStatsQueryStr_; } - - @Override - public String toSql() { - if (!isIncremental_) { - return "COMPUTE STATS " + tableName_.toSql(); - } else { - return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() + - partitionSpec_ == null ? "" : partitionSpec_.toSql(); - } - } - - public TComputeStatsParams toThrift() { - TComputeStatsParams params = new TComputeStatsParams(); - params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName())); - params.setTbl_stats_query(tableStatsQueryStr_); - if (columnStatsQueryStr_ != null) { - params.setCol_stats_query(columnStatsQueryStr_); - } else { - params.setCol_stats_queryIsSet(false); - } - - params.setIs_incremental(isIncremental_); - params.setExisting_part_stats(validPartStats_); - params.setExpect_all_partitions(expectAllPartitions_); - if (!expectAllPartitions_) params.setExpected_partitions(expectedPartitions_); - if (isIncremental_) { - params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols()); - } - return params; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java deleted file mode 100644 index 1ee6fd4..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; - -import com.cloudera.impala.authorization.Privilege; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.extdatasource.ApiVersion; -import com.cloudera.impala.thrift.TCreateDataSourceParams; -import com.cloudera.impala.thrift.TDataSource; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; - -/** - * Represents a CREATE DATA SOURCE statement. - */ -public class CreateDataSrcStmt extends StatementBase { - private final String dataSrcName_; - private final String className_; - private final String apiVersionString_; - private final HdfsUri location_; - private final boolean ifNotExists_; - private ApiVersion apiVersion_; - - public CreateDataSrcStmt(String dataSrcName, HdfsUri location, String className, - String apiVersionString, boolean ifNotExists) { - Preconditions.checkNotNull(dataSrcName); - Preconditions.checkNotNull(className); - Preconditions.checkNotNull(apiVersionString); - Preconditions.checkNotNull(location); - dataSrcName_ = dataSrcName.toLowerCase(); - location_ = location; - className_ = className; - apiVersionString_ = apiVersionString; - ifNotExists_ = ifNotExists; - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - if (!MetaStoreUtils.validateName(dataSrcName_)) { - throw new AnalysisException("Invalid data source name: " + dataSrcName_); - } - if (!ifNotExists_ && analyzer.getCatalog().getDataSource(dataSrcName_) != null) { - throw new AnalysisException(Analyzer.DATA_SRC_ALREADY_EXISTS_ERROR_MSG + - dataSrcName_); - } - - apiVersion_ = ApiVersion.parseApiVersion(apiVersionString_); - if (apiVersion_ == null) { - throw new AnalysisException("Invalid API version: '" + apiVersionString_ + - "'. Valid API versions: " + Joiner.on(", ").join(ApiVersion.values())); - } - - location_.analyze(analyzer, Privilege.ALL, FsAction.READ); - // TODO: Check class exists and implements API version - // TODO: authorization check - } - - @Override - public String toSql() { - StringBuilder sb = new StringBuilder(); - sb.append("CREATE DATA SOURCE "); - if (ifNotExists_) sb.append("IF NOT EXISTS "); - sb.append(dataSrcName_); - sb.append(" LOCATION '"); - sb.append(location_.getLocation()); - sb.append("' CLASS '"); - sb.append(className_); - sb.append("' API_VERSION '"); - sb.append(apiVersion_.name()); - sb.append("'"); - return sb.toString(); - } - - public TCreateDataSourceParams toThrift() { - return new TCreateDataSourceParams( - new TDataSource(dataSrcName_, location_.toString(), className_, - apiVersion_.name())).setIf_not_exists(ifNotExists_); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java deleted file mode 100644 index 3dedd8b..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java +++ /dev/null @@ -1,102 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; - -import com.cloudera.impala.authorization.Privilege; -import com.cloudera.impala.catalog.Db; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.thrift.TCreateDbParams; - -/** - * Represents a CREATE DATABASE statement - */ -public class CreateDbStmt extends StatementBase { - private final String dbName_; - private final HdfsUri location_; - private final String comment_; - private final boolean ifNotExists_; - - /** - * Creates a database with the given name. - */ - public CreateDbStmt(String dbName) { - this(dbName, null, null, false); - } - - /** - * Creates a database with the given name, comment, and HDFS table storage location. - * New tables created in the database inherit the location property for their default - * storage location. Create database will throw an error if the database already exists - * unless the ifNotExists is true. - */ - public CreateDbStmt(String dbName, String comment, HdfsUri location, - boolean ifNotExists) { - this.dbName_ = dbName; - this.comment_ = comment; - this.location_ = location; - this.ifNotExists_ = ifNotExists; - } - - public String getComment() { return comment_; } - public String getDb() { return dbName_; } - public boolean getIfNotExists() { return ifNotExists_; } - public HdfsUri getLocation() { return location_; } - - @Override - public String toSql() { - StringBuilder sb = new StringBuilder("CREATE DATABASE"); - if (ifNotExists_) sb.append(" IF NOT EXISTS"); - sb.append(dbName_); - if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'"); - if (location_ != null) sb.append(" LOCATION '" + location_ + "'"); - return sb.toString(); - } - - public TCreateDbParams toThrift() { - TCreateDbParams params = new TCreateDbParams(); - params.setDb(getDb()); - params.setComment(getComment()); - params.setLocation(location_ == null ? null : location_.toString()); - params.setIf_not_exists(getIfNotExists()); - return params; - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - // Check whether the db name meets the Metastore's requirements. - if (!MetaStoreUtils.validateName(dbName_)) { - throw new AnalysisException("Invalid database name: " + dbName_); - } - - // Note: It is possible that a database with the same name was created external to - // this Impala instance. If that happens, the caller will not get an - // AnalysisException when creating the database, they will get a Hive - // AlreadyExistsException once the request has been sent to the metastore. - Db db = analyzer.getDb(getDb(), Privilege.CREATE, false); - if (db != null && !ifNotExists_) { - throw new AnalysisException(Analyzer.DB_ALREADY_EXISTS_ERROR_MSG + getDb()); - } - - if (location_ != null) { - location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java deleted file mode 100644 index ef90b8a..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import com.cloudera.impala.catalog.Role; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.thrift.TCreateDropRoleParams; -import com.google.common.base.Preconditions; - -/** - * Represents a "CREATE ROLE" or "DROP ROLE" statement. - */ -public class CreateDropRoleStmt extends AuthorizationStmt { - private final String roleName_; - private final boolean isDropRole_; - - // Set in analysis - private String user_; - - public CreateDropRoleStmt(String roleName, boolean isDropRole) { - Preconditions.checkNotNull(roleName); - roleName_ = roleName; - isDropRole_ = isDropRole; - } - - @Override - public String toSql() { - return String.format("%s ROLE %s", roleName_, isDropRole_ ? "DROP" : "CREATE"); - } - - public TCreateDropRoleParams toThrift() { - TCreateDropRoleParams params = new TCreateDropRoleParams(); - params.setRole_name(roleName_); - params.setIs_drop(isDropRole_); - return params; - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - super.analyze(analyzer); - Role existingRole = analyzer.getCatalog().getAuthPolicy().getRole(roleName_); - if (isDropRole_ && existingRole == null) { - throw new AnalysisException(String.format("Role '%s' does not exist.", roleName_)); - } else if (!isDropRole_ && existingRole != null) { - throw new AnalysisException(String.format("Role '%s' already exists.", roleName_)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java deleted file mode 100644 index ebfd7b6..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java +++ /dev/null @@ -1,206 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.hadoop.fs.permission.FsAction; - -import com.cloudera.impala.authorization.AuthorizeableFn; -import com.cloudera.impala.authorization.Privilege; -import com.cloudera.impala.authorization.PrivilegeRequest; -import com.cloudera.impala.catalog.Catalog; -import com.cloudera.impala.catalog.Db; -import com.cloudera.impala.catalog.Function; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.thrift.TCreateFunctionParams; -import com.cloudera.impala.thrift.TFunctionBinaryType; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Base class for CREATE [] FUNCTION. - */ -public abstract class CreateFunctionStmtBase extends StatementBase { - - // Enums for valid keys for optional arguments. - public enum OptArg { - COMMENT, - SYMBOL, // Only used for Udfs - PREPARE_FN, // Only used for Udfs - CLOSE_FN, // Only used for Udfs - UPDATE_FN, // Only used for Udas - INIT_FN, // Only used for Udas - SERIALIZE_FN, // Only used for Udas - MERGE_FN, // Only used for Udas - FINALIZE_FN // Only used for Udas - }; - - protected final FunctionName fnName_; - protected final FunctionArgs args_; - protected final TypeDef retTypeDef_; - protected final HdfsUri location_; - protected final HashMap<CreateFunctionStmtBase.OptArg, String> optArgs_; - protected final boolean ifNotExists_; - - // Result of analysis. - protected Function fn_; - - // Db object for function fn_. Set in analyze(). - protected Db db_; - - // Set in analyze() - protected String sqlString_; - - protected CreateFunctionStmtBase(FunctionName fnName, FunctionArgs args, - TypeDef retTypeDef, HdfsUri location, boolean ifNotExists, - HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) { - // The return and arg types must either be both null or non-null. - Preconditions.checkState(!(args == null ^ retTypeDef == null)); - fnName_ = fnName; - args_ = args; - retTypeDef_ = retTypeDef; - location_ = location; - ifNotExists_ = ifNotExists; - optArgs_ = optArgs; - } - - public String getComment() { return optArgs_.get(OptArg.COMMENT); } - public boolean getIfNotExists() { return ifNotExists_; } - public boolean hasSignature() { return args_ != null; } - - public TCreateFunctionParams toThrift() { - TCreateFunctionParams params = new TCreateFunctionParams(fn_.toThrift()); - params.setIf_not_exists(getIfNotExists()); - params.setFn(fn_.toThrift()); - return params; - } - - // Returns optArg[key], first validating that it is set. - protected String checkAndGetOptArg(OptArg key) - throws AnalysisException { - if (!optArgs_.containsKey(key)) { - throw new AnalysisException("Argument '" + key + "' must be set."); - } - return optArgs_.get(key); - } - - protected void checkOptArgNotSet(OptArg key) - throws AnalysisException { - if (optArgs_.containsKey(key)) { - throw new AnalysisException("Optional argument '" + key + "' should not be set."); - } - } - - // Returns the function's binary type based on the path extension. - private TFunctionBinaryType getBinaryType() throws AnalysisException { - TFunctionBinaryType binaryType = null; - String binaryPath = fn_.getLocation().getLocation(); - int suffixIndex = binaryPath.lastIndexOf("."); - if (suffixIndex != -1) { - String suffix = binaryPath.substring(suffixIndex + 1); - if (suffix.equalsIgnoreCase("jar")) { - binaryType = TFunctionBinaryType.JAVA; - } else if (suffix.equalsIgnoreCase("so")) { - binaryType = TFunctionBinaryType.NATIVE; - } else if (suffix.equalsIgnoreCase("ll")) { - binaryType = TFunctionBinaryType.IR; - } - } - if (binaryType == null) { - throw new AnalysisException("Unknown binary type: '" + binaryPath + - "'. Binary must end in .jar, .so or .ll"); - } - return binaryType; - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - // Validate function name is legal - fnName_.analyze(analyzer); - - if (hasSignature()) { - // Validate function arguments and return type. - args_.analyze(analyzer); - retTypeDef_.analyze(analyzer); - fn_ = createFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(), - args_.hasVarArgs()); - } else { - fn_ = createFunction(fnName_, null, null, false); - } - - // For now, if authorization is enabled, the user needs ALL on the server - // to create functions. - // TODO: this is not the right granularity but acceptable for now. - analyzer.registerPrivReq(new PrivilegeRequest( - new AuthorizeableFn(fn_.signatureString()), Privilege.ALL)); - - Db builtinsDb = analyzer.getCatalog().getDb(Catalog.BUILTINS_DB); - if (builtinsDb.containsFunction(fn_.getName())) { - throw new AnalysisException("Function cannot have the same name as a builtin: " + - fn_.getFunctionName().getFunction()); - } - - db_ = analyzer.getDb(fn_.dbName(), Privilege.CREATE); - Function existingFn = db_.getFunction(fn_, Function.CompareMode.IS_INDISTINGUISHABLE); - if (existingFn != null && !ifNotExists_) { - throw new AnalysisException(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG + - existingFn.signatureString()); - } - - location_.analyze(analyzer, Privilege.CREATE, FsAction.READ); - fn_.setLocation(location_); - - // Check the file type from the binary type to infer the type of the UDA - fn_.setBinaryType(getBinaryType()); - - // Forbid unsupported and complex types. - if (hasSignature()) { - List<Type> refdTypes = Lists.newArrayList(fn_.getReturnType()); - refdTypes.addAll(Lists.newArrayList(fn_.getArgs())); - for (Type t: refdTypes) { - if (!t.isSupported() || t.isComplexType()) { - throw new AnalysisException( - String.format("Type '%s' is not supported in UDFs/UDAs.", t.toSql())); - } - } - } else if (fn_.getBinaryType() != TFunctionBinaryType.JAVA) { - throw new AnalysisException( - String.format("Native functions require a return type and/or " + - "argument types: %s", fn_.getFunctionName())); - } - - // Check if the function can be persisted. We persist all native/IR functions - // and also JAVA functions added without signature. Only JAVA functions added - // with signatures aren't persisted. - if (getBinaryType() == TFunctionBinaryType.JAVA && hasSignature()) { - fn_.setIsPersistent(false); - } else { - fn_.setIsPersistent(true); - } - } - - /** - * Creates a concrete function. - */ - protected abstract Function createFunction(FunctionName fnName, - ArrayList<Type> argTypes, Type retType, boolean hasVarArgs); -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java deleted file mode 100644 index cc04b04..0000000 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java +++ /dev/null @@ -1,209 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.analysis; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.thrift.TCreateOrAlterViewParams; -import com.cloudera.impala.thrift.TTableName; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * Base class for CREATE VIEW and ALTER VIEW AS SELECT statements. - */ -public abstract class CreateOrAlterViewStmtBase extends StatementBase { - private final static Logger LOG = - LoggerFactory.getLogger(CreateOrAlterViewStmtBase.class); - - protected final boolean ifNotExists_; - protected final TableName tableName_; - protected final ArrayList<ColumnDef> columnDefs_; - protected final String comment_; - protected final QueryStmt viewDefStmt_; - - // Set during analysis - protected String dbName_; - protected String owner_; - - // The original SQL-string given as view definition. Set during analysis. - // Corresponds to Hive's viewOriginalText. - protected String originalViewDef_; - - // Query statement (as SQL string) that defines the View for view substitution. - // It is a transformation of the original view definition, e.g., to enforce the - // columnDefs even if the original view definition has explicit column aliases. - // If column definitions were given, then this "expanded" view definition - // wraps the original view definition in a select stmt as follows. - // - // SELECT viewName.origCol1 AS colDesc1, viewName.origCol2 AS colDesc2, ... - // FROM (originalViewDef) AS viewName - // - // Corresponds to Hive's viewExpandedText, but is not identical to the SQL - // Hive would produce in view creation. - protected String inlineViewDef_; - - // Columns to use in the select list of the expanded SQL string and when registering - // this view in the metastore. Set in analysis. - protected ArrayList<ColumnDef> finalColDefs_; - - public CreateOrAlterViewStmtBase(boolean ifNotExists, TableName tableName, - ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) { - Preconditions.checkNotNull(tableName); - Preconditions.checkNotNull(viewDefStmt); - this.ifNotExists_ = ifNotExists; - this.tableName_ = tableName; - this.columnDefs_ = columnDefs; - this.comment_ = comment; - this.viewDefStmt_ = viewDefStmt; - } - - /** - * Sets the originalViewDef and the expanded inlineViewDef based on viewDefStmt. - * If columnDefs were given, checks that they do not contain duplicate column names - * and throws an exception if they do. - */ - protected void createColumnAndViewDefs(Analyzer analyzer) throws AnalysisException { - Preconditions.checkNotNull(dbName_); - Preconditions.checkNotNull(owner_); - - // Set the finalColDefs to reflect the given column definitions. - if (columnDefs_ != null) { - Preconditions.checkState(!columnDefs_.isEmpty()); - if (columnDefs_.size() != viewDefStmt_.getColLabels().size()) { - String cmp = - (columnDefs_.size() > viewDefStmt_.getColLabels().size()) ? "more" : "fewer"; - throw new AnalysisException(String.format("Column-definition list has " + - "%s columns (%s) than the view-definition query statement returns (%s).", - cmp, columnDefs_.size(), viewDefStmt_.getColLabels().size())); - } - - finalColDefs_ = columnDefs_; - Preconditions.checkState( - columnDefs_.size() == viewDefStmt_.getBaseTblResultExprs().size()); - for (int i = 0; i < columnDefs_.size(); ++i) { - // Set type in the column definition from the view-definition statement. - columnDefs_.get(i).setType(viewDefStmt_.getBaseTblResultExprs().get(i).getType()); - } - } else { - // Create list of column definitions from the view-definition statement. - finalColDefs_ = Lists.newArrayList(); - List<Expr> exprs = viewDefStmt_.getBaseTblResultExprs(); - List<String> labels = viewDefStmt_.getColLabels(); - Preconditions.checkState(exprs.size() == labels.size()); - for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) { - ColumnDef colDef = new ColumnDef(labels.get(i), null, null); - colDef.setType(exprs.get(i).getType()); - finalColDefs_.add(colDef); - } - } - - // Check that the column definitions have valid names, and that there are no - // duplicate column names. - Set<String> distinctColNames = Sets.newHashSet(); - for (ColumnDef colDesc: finalColDefs_) { - colDesc.analyze(); - if (!distinctColNames.add(colDesc.getColName().toLowerCase())) { - throw new AnalysisException("Duplicate column name: " + colDesc.getColName()); - } - } - - // Set original and expanded view-definition SQL strings. - originalViewDef_ = viewDefStmt_.toSql(); - - // If no column definitions were given, then the expanded view SQL is the same - // as the original one. - if (columnDefs_ == null) { - inlineViewDef_ = originalViewDef_; - return; - } - - // Wrap the original view-definition statement into a SELECT to enforce the - // given column definitions. - StringBuilder sb = new StringBuilder(); - sb.append("SELECT "); - for (int i = 0; i < finalColDefs_.size(); ++i) { - String colRef = ToSqlUtils.getIdentSql(viewDefStmt_.getColLabels().get(i)); - String colAlias = ToSqlUtils.getIdentSql(finalColDefs_.get(i).getColName()); - sb.append(String.format("%s.%s AS %s", tableName_.getTbl(), colRef, colAlias)); - sb.append((i+1 != finalColDefs_.size()) ? ", " : ""); - } - // Do not use 'AS' for table aliases because Hive only accepts them without 'AS'. - sb.append(String.format(" FROM (%s) %s", originalViewDef_, tableName_.getTbl())); - inlineViewDef_ = sb.toString(); - } - - /** - * Computes the column lineage graph for a create/alter view statetement. - */ - protected void computeLineageGraph(Analyzer analyzer) { - ColumnLineageGraph graph = analyzer.getColumnLineageGraph(); - List<String> colDefs = Lists.newArrayList(); - for (ColumnDef colDef: finalColDefs_) { - colDefs.add(dbName_ + "." + getTbl() + "." + colDef.getColName()); - } - graph.addTargetColumnLabels(colDefs); - graph.computeLineageGraph(viewDefStmt_.getResultExprs(), analyzer); - LOG.trace("lineage: " + graph.debugString()); - } - - public TCreateOrAlterViewParams toThrift() { - TCreateOrAlterViewParams params = new TCreateOrAlterViewParams(); - params.setView_name(new TTableName(getDb(), getTbl())); - for (ColumnDef col: finalColDefs_) { - params.addToColumns(col.toThrift()); - } - params.setOwner(getOwner()); - params.setIf_not_exists(getIfNotExists()); - params.setOriginal_view_def(originalViewDef_); - params.setExpanded_view_def(inlineViewDef_); - if (comment_ != null) params.setComment(comment_); - return params; - } - - /** - * Can only be called after analysis, returns the name of the database the table will - * be created within. - */ - public String getDb() { - Preconditions.checkNotNull(dbName_); - return dbName_; - } - - /** - * Can only be called after analysis, returns the owner of the view to be created. - */ - public String getOwner() { - Preconditions.checkNotNull(owner_); - return owner_; - } - - public List<ColumnDef> getColumnDescs() {return columnDefs_; } - public String getComment() { return comment_; } - public boolean getIfNotExists() { return ifNotExists_; } - public String getOriginalViewDef() { return originalViewDef_; } - public String getInlineViewDef() { return inlineViewDef_; } - public String getTbl() { return tableName_.getTbl(); } -}
