http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java 
b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
new file mode 100644
index 0000000..a00bf53
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -0,0 +1,680 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.impala.catalog.Table;
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+import com.cloudera.impala.thrift.TEdgeType;
+import com.cloudera.impala.thrift.TQueryCtx;
+import com.cloudera.impala.thrift.TLineageGraph;
+import com.cloudera.impala.thrift.TMultiEdge;
+import com.cloudera.impala.thrift.TVertex;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+/**
+ * Represents a vertex in the column lineage graph. A Vertex may correspond to 
a base
+ * table column, a column in the destination table (for the case of INSERT or 
CTAS
+ * queries) or a result expr (labeled column of a query result set).
+ */
+final class Vertex implements Comparable<Vertex> {
+  // Unique identifier of this vertex.
+  private final VertexId id_;
+
+  private final String type_ = "COLUMN";
+
+  // A fully-qualified column name or the label of a result expr
+  private final String label_;
+
+  public Vertex(VertexId id, String label) {
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(label);
+    id_ = id;
+    label_ = label;
+  }
+  public VertexId getVertexId() { return id_; }
+  public String getLabel() { return label_; }
+  public String getType() { return type_; }
+
+  @Override
+  public String toString() { return "(" + id_ + ":" + type_ + ":" + label_ + 
")"; }
+
+  /**
+   * Encodes this Vertex object into a JSON object represented by a Map.
+   */
+  public Map toJson() {
+    // Use a LinkedHashMap to generate a strict ordering of elements.
+    Map obj = new LinkedHashMap();
+    obj.put("id", id_.asInt());
+    obj.put("vertexType", type_);
+    obj.put("vertexId", label_);
+    return obj;
+  }
+
+  /**
+   * Constructs a Vertex object from a JSON object. The new object is returned.
+   */
+  public static Vertex fromJsonObj(JSONObject obj) {
+    int id = ((Long) obj.get("id")).intValue();
+    String label = (String) obj.get("vertexId");
+    return new Vertex(new VertexId(id), label);
+  }
+
+  /**
+   * Encodes this Vertex object into a thrift object
+   */
+  public TVertex toThrift() {
+    return new TVertex(id_.asInt(), label_);
+  }
+
+  /**
+   * Constructs a Vertex object from a thrift object.
+   */
+  public static Vertex fromThrift(TVertex vertex) {
+    int id = ((Long) vertex.id).intValue();
+    return new Vertex(new VertexId(id), vertex.label);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (obj.getClass() != this.getClass()) return false;
+    Vertex vertex = (Vertex) obj;
+    return this.id_.equals(vertex.id_) &&
+        this.label_.equals(vertex.label_);
+  }
+
+  public int compareTo(Vertex cmp) { return this.id_.compareTo(cmp.id_); }
+
+  @Override
+  public int hashCode() { return id_.hashCode(); }
+}
+
+/**
+ * Represents the unique identifier of a Vertex.
+ */
+class VertexId extends Id<VertexId> {
+  protected VertexId(int id) {
+    super(id);
+  }
+  public static IdGenerator<VertexId> createGenerator() {
+    return new IdGenerator<VertexId>() {
+      @Override
+      public VertexId getNextId() { return new VertexId(nextId_++); }
+      @Override
+      public VertexId getMaxId() { return new VertexId(nextId_ - 1); }
+    };
+  }
+}
+
+/**
+ * Represents a set of uni-directional edges in the column lineage graph, one 
edge from
+ * every source Vertex in 'sources_' to every target Vertex in 'targets_'. An 
edge
+ * indicates a dependency between a source and a target Vertex. There are two 
types of
+ * edges, PROJECTION and PREDICATE, that are described in the 
ColumnLineageGraph class.
+ */
+final class MultiEdge {
+  public static enum EdgeType {
+    PROJECTION, PREDICATE
+  }
+  private final Set<Vertex> sources_;
+  private final Set<Vertex> targets_;
+  private final EdgeType edgeType_;
+
+  public MultiEdge(Set<Vertex> sources, Set<Vertex> targets, EdgeType type) {
+    sources_ = sources;
+    targets_ = targets;
+    edgeType_ = type;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    Joiner joiner = Joiner.on(",");
+    builder.append("Sources: [");
+    builder.append(joiner.join(sources_) + "]\n");
+    builder.append("Targets: [");
+    builder.append(joiner.join(targets_) + "]\n");
+    builder.append("Type: " + edgeType_);
+    return builder.toString();
+  }
+
+  /**
+   * Encodes this MultiEdge object to a JSON object represented by a Map.
+   */
+  public Map toJson() {
+    Map obj = new LinkedHashMap();
+    // Add sources
+    JSONArray sourceIds = new JSONArray();
+    for (Vertex vertex: sources_) {
+      sourceIds.add(vertex.getVertexId());
+    }
+    obj.put("sources", sourceIds);
+    // Add targets
+    JSONArray targetIds = new JSONArray();
+    for (Vertex vertex: targets_) {
+      targetIds.add(vertex.getVertexId());
+    }
+    obj.put("targets", targetIds);
+    obj.put("edgeType", edgeType_.toString());
+    return obj;
+  }
+
+  /**
+   * Encodes this MultiEdge object to a thrift object
+   */
+  public TMultiEdge toThrift() {
+    List<TVertex> sources = Lists.newArrayList();
+    for (Vertex vertex: sources_) {
+      sources.add(vertex.toThrift());
+    }
+    List<TVertex> targets = Lists.newArrayList();
+    for (Vertex vertex: targets_) {
+      targets.add(vertex.toThrift());
+    }
+    if (edgeType_ == EdgeType.PROJECTION) {
+      return new TMultiEdge(sources, targets, TEdgeType.PROJECTION);
+    }
+    return new TMultiEdge(sources, targets, TEdgeType.PREDICATE);
+  }
+
+  /**
+   * Constructs a MultiEdge object from a thrift object
+   */
+  public static MultiEdge fromThrift(TMultiEdge obj){
+    Set<Vertex> sources = Sets.newHashSet();
+    for (TVertex vertex: obj.sources) {
+      sources.add(Vertex.fromThrift(vertex));
+    }
+    Set<Vertex> targets = Sets.newHashSet();
+    for (TVertex vertex: obj.targets) {
+      targets.add(Vertex.fromThrift(vertex));
+    }
+    if (obj.edgetype == TEdgeType.PROJECTION) {
+      return new MultiEdge(sources, targets, EdgeType.PROJECTION);
+    }
+    return new MultiEdge(sources, targets, EdgeType.PREDICATE);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (obj.getClass() != this.getClass()) return false;
+    MultiEdge edge = (MultiEdge) obj;
+    return edge.sources_.equals(this.sources_) &&
+        edge.targets_.equals(this.targets_) &&
+        edge.edgeType_ == this.edgeType_;
+  }
+}
+
+/**
+ * Represents the column lineage graph of a query. This is a directional graph 
that is
+ * used to track dependencies among the table/column entities that participate 
in
+ * a query. There are two types of dependencies that are represented as edges 
in the
+ * column lineage graph:
+ * a) Projection dependency: This is a dependency between a set of source
+ * columns (base table columns) and a single target (result expr or table 
column).
+ * This dependency indicates that values of the target depend on the values of 
the source
+ * columns.
+ * b) Predicate dependency: This is a dependency between a set of target
+ * columns (or exprs) and a set of source columns (base table columns). It 
indicates that
+ * the source columns restrict the values of their targets (e.g. by 
participating in
+ * WHERE clause predicates).
+ *
+ * The following dependencies are generated for a query:
+ * - Exactly one projection dependency for every result expr / target column.
+ * - Exactly one predicate dependency that targets all result exprs / target 
cols and
+ *   depends on all columns participating in a conjunct in the query.
+ * - Special case of analytic fns: One predicate dependency per result expr / 
target col
+ *   whose value is directly or indirectly affected by an analytic function 
with a
+ *   partition by and/or order by clause.
+ */
+public class ColumnLineageGraph {
+  private final static Logger LOG = 
LoggerFactory.getLogger(ColumnLineageGraph.class);
+  // Query statement
+  private String queryStr_;
+
+  // Name of the user that issued this query
+  private String user_;
+
+  private final List<Expr> resultDependencyPredicates_ = Lists.newArrayList();
+
+  private final List<MultiEdge> edges_ = Lists.newArrayList();
+
+  // Timestamp in seconds since epoch (GMT) this query was submitted for 
execution.
+  private long timestamp_;
+
+  // Map of Vertex labels to Vertex objects.
+  private final Map<String, Vertex> vertices_ = Maps.newHashMap();
+
+  // Map of Vertex ids to Vertex objects. Used primarily during the 
construction of the
+  // ColumnLineageGraph from a serialized JSON object.
+  private final Map<VertexId, Vertex> idToVertexMap_ = Maps.newHashMap();
+
+  // For an INSERT or a CTAS, these are the columns of the
+  // destination table plus any partitioning columns (when dynamic 
partitioning is used).
+  // For a SELECT stmt, they are the labels of the result exprs.
+  private final List<String> targetColumnLabels_ = Lists.newArrayList();
+
+  // Repository for tuple and slot descriptors for this query. Use it to 
construct the
+  // column lineage graph.
+  private DescriptorTable descTbl_;
+
+  private final IdGenerator<VertexId> vertexIdGenerator = 
VertexId.createGenerator();
+
+  public ColumnLineageGraph() { }
+
+  /**
+   * Private c'tor, used only for testing.
+   */
+  private ColumnLineageGraph(String stmt, String user, long timestamp) {
+    queryStr_ = stmt;
+    user_ = user;
+    timestamp_ = timestamp;
+  }
+
+  private void setVertices(Set<Vertex> vertices) {
+    for (Vertex vertex: vertices) {
+      vertices_.put(vertex.getLabel(), vertex);
+      idToVertexMap_.put(vertex.getVertexId(), vertex);
+    }
+  }
+
+  /**
+   * Creates a new MultiEdge in the column lineage graph from the sets of 
'sources' and
+   * 'targets' labels (representing column names or result expr labels). The 
new
+   * MultiEdge object is returned.
+   */
+  private MultiEdge createMultiEdge(Set<String> targets, Set<String> sources,
+      MultiEdge.EdgeType type) {
+    Set<Vertex> targetVertices = Sets.newHashSet();
+    for (String target: targets) {
+      targetVertices.add(createVertex(target));
+    }
+    Set<Vertex> sourceVertices = Sets.newHashSet();
+    for (String source: sources) {
+      sourceVertices.add(createVertex(source));
+    }
+    MultiEdge edge = new MultiEdge(sourceVertices, targetVertices, type);
+    edges_.add(edge);
+    return edge;
+  }
+
+  /**
+   * Creates a new vertex in the column lineage graph. The new Vertex object is
+   * returned. If a Vertex with the same label already exists, reuse it.
+   */
+  private Vertex createVertex(String label) {
+    Vertex newVertex = vertices_.get(label);
+    if (newVertex != null) return newVertex;
+    newVertex = new Vertex(vertexIdGenerator.getNextId(), label);
+    vertices_.put(newVertex.getLabel(), newVertex);
+    idToVertexMap_.put(newVertex.getVertexId(), newVertex);
+    return newVertex;
+  }
+
+  /**
+   * Computes the column lineage graph of a query from the list of query 
result exprs.
+   * 'rootAnalyzer' is the Analyzer that was used for the analysis of the 
query.
+   */
+  public void computeLineageGraph(List<Expr> resultExprs, Analyzer 
rootAnalyzer) {
+    init(rootAnalyzer);
+    computeProjectionDependencies(resultExprs);
+    computeResultPredicateDependencies(rootAnalyzer);
+  }
+
+  /**
+   * Initialize the ColumnLineageGraph from the root analyzer of a query.
+   */
+  private void init(Analyzer analyzer) {
+    Preconditions.checkNotNull(analyzer);
+    Preconditions.checkState(analyzer.isRootAnalyzer());
+    TQueryCtx queryCtx = analyzer.getQueryCtx();
+    if (queryCtx.request.isSetRedacted_stmt()) {
+      queryStr_ = queryCtx.request.redacted_stmt;
+    } else {
+      queryStr_ = queryCtx.request.stmt;
+    }
+    Preconditions.checkNotNull(queryStr_);
+    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    try {
+      timestamp_ = df.parse(queryCtx.now_string).getTime() / 1000;
+    } catch (java.text.ParseException e) {
+      LOG.error("Error parsing timestamp value: " + queryCtx.now_string +
+          " " + e.getMessage());
+      timestamp_ = new Date().getTime() / 1000;
+    }
+    descTbl_ = analyzer.getDescTbl();
+    user_ = analyzer.getUser().getName();
+  }
+
+  private void computeProjectionDependencies(List<Expr> resultExprs) {
+    Preconditions.checkNotNull(resultExprs);
+    Preconditions.checkState(!resultExprs.isEmpty());
+    Preconditions.checkState(resultExprs.size() == targetColumnLabels_.size());
+    for (int i = 0; i < resultExprs.size(); ++i) {
+      Expr expr = resultExprs.get(i);
+      Set<String> sourceBaseCols = Sets.newHashSet();
+      List<Expr> dependentExprs = Lists.newArrayList();
+      getSourceBaseCols(expr, sourceBaseCols, dependentExprs, false);
+      Set<String> targets = Sets.newHashSet(targetColumnLabels_.get(i));
+      createMultiEdge(targets, sourceBaseCols, MultiEdge.EdgeType.PROJECTION);
+      if (!dependentExprs.isEmpty()) {
+        // We have additional exprs that 'expr' has a predicate dependency on.
+        // Gather the transitive predicate dependencies of 'expr' based on its 
direct
+        // predicate dependencies. For each direct predicate dependency p, 
'expr' is
+        // transitively predicate dependent on all exprs that p is projection 
and
+        // predicate dependent on.
+        Set<String> predicateBaseCols = Sets.newHashSet();
+        for (Expr dependentExpr: dependentExprs) {
+          getSourceBaseCols(dependentExpr, predicateBaseCols, null, true);
+        }
+        createMultiEdge(targets, predicateBaseCols, 
MultiEdge.EdgeType.PREDICATE);
+      }
+    }
+  }
+
+  /**
+   * Compute predicate dependencies for the query result, i.e. exprs that 
affect the
+   * possible values of the result exprs / target columns, such as predicates 
in a WHERE
+   * clause.
+   */
+  private void computeResultPredicateDependencies(Analyzer analyzer) {
+    List<Expr> conjuncts = analyzer.getConjuncts();
+    for (Expr expr: conjuncts) {
+      if (expr.isAuxExpr()) continue;
+      resultDependencyPredicates_.add(expr);
+    }
+    Set<String> predicateBaseCols = Sets.newHashSet();
+    for (Expr expr: resultDependencyPredicates_) {
+      getSourceBaseCols(expr, predicateBaseCols, null, true);
+    }
+    if (predicateBaseCols.isEmpty()) return;
+    Set<String> targets = Sets.newHashSet(targetColumnLabels_);
+    createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE);
+  }
+
+  /**
+   * Identify the base table columns that 'expr' is connected to by 
recursively resolving
+   * all associated slots through inline views and materialization points to 
base-table
+   * slots. If 'directPredDeps' is not null, it is populated with the exprs 
that
+   * have a predicate dependency with 'expr' (e.g. partitioning and order by 
exprs for
+   * the case of an analytic function). If 'traversePredDeps' is false, not 
all the
+   * children exprs of 'expr' are used to identify the base columns that 
'expr' is
+   * connected to. Which children are filtered depends on the type of 'expr' 
(e.g. for
+   * AnalyticFunctionExpr, grouping and sorting exprs are filtered out).
+   */
+  private void getSourceBaseCols(Expr expr, Set<String> sourceBaseCols,
+      List<Expr> directPredDeps, boolean traversePredDeps) {
+    List<Expr> exprsToTraverse = getProjectionDeps(expr);
+    List<Expr> predicateDepExprs = getPredicateDeps(expr);
+    if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs);
+    if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs);
+    List<SlotId> slotIds = Lists.newArrayList();
+    for (Expr e: exprsToTraverse) {
+      e.getIds(null, slotIds);
+    }
+    for (SlotId slotId: slotIds) {
+      SlotDescriptor slotDesc = descTbl_.getSlotDesc(slotId);
+      List<Expr> sourceExprs = slotDesc.getSourceExprs();
+      if (sourceExprs.isEmpty() && slotDesc.isScanSlot() &&
+          slotDesc.getPath().isRootedAtTuple()) {
+        // slot should correspond to a materialized tuple of a table
+        Preconditions.checkState(slotDesc.getParent().isMaterialized());
+        List<String> path = slotDesc.getPath().getCanonicalPath();
+        sourceBaseCols.add(Joiner.on(".").join(path));
+      } else {
+        for (Expr sourceExpr: sourceExprs) {
+          getSourceBaseCols(sourceExpr, sourceBaseCols, directPredDeps,
+              traversePredDeps);
+        }
+      }
+    }
+  }
+
+  /**
+   * Retrieve the exprs that 'e' is directly projection dependent on.
+   * TODO Handle conditional exprs (e.g. CASE, IF).
+   */
+  private List<Expr> getProjectionDeps(Expr e) {
+    Preconditions.checkNotNull(e);
+    List<Expr> outputExprs = Lists.newArrayList();
+    if (e instanceof AnalyticExpr) {
+      AnalyticExpr analytic = (AnalyticExpr) e;
+      outputExprs.addAll(analytic.getChildren().subList(0,
+          analytic.getFnCall().getParams().size()));
+    } else {
+      outputExprs.add(e);
+    }
+    return outputExprs;
+  }
+
+  /**
+   * Retrieve the exprs that 'e' is directly predicate dependent on.
+   * TODO Handle conditional exprs (e.g. CASE, IF).
+   */
+  private List<Expr> getPredicateDeps(Expr e) {
+    Preconditions.checkNotNull(e);
+    List<Expr> outputExprs = Lists.newArrayList();
+    if (e instanceof AnalyticExpr) {
+      AnalyticExpr analyticExpr = (AnalyticExpr) e;
+      outputExprs.addAll(analyticExpr.getPartitionExprs());
+      for (OrderByElement orderByElem: analyticExpr.getOrderByElements()) {
+        outputExprs.add(orderByElem.getExpr());
+      }
+    }
+    return outputExprs;
+  }
+
+  public void addDependencyPredicates(Collection<Expr> exprs) {
+    resultDependencyPredicates_.addAll(exprs);
+  }
+
+  /**
+   * Encodes the ColumnLineageGraph object to JSON.
+   */
+  public String toJson() {
+    if (Strings.isNullOrEmpty(queryStr_)) return "";
+    Map obj = new LinkedHashMap();
+    obj.put("queryText", queryStr_);
+    obj.put("hash", getQueryHash(queryStr_));
+    obj.put("user", user_);
+    obj.put("timestamp", timestamp_);
+    // Add edges
+    JSONArray edges = new JSONArray();
+    for (MultiEdge edge: edges_) {
+      edges.add(edge.toJson());
+    }
+    obj.put("edges", edges);
+    // Add vertices
+    TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
+    JSONArray vertices = new JSONArray();
+    for (Vertex vertex: sortedVertices) {
+      vertices.add(vertex.toJson());
+    }
+    obj.put("vertices", vertices);
+    return JSONValue.toJSONString(obj);
+  }
+
+  /**
+   * Serializes the ColumnLineageGraph to a thrift object
+   */
+  public TLineageGraph toThrift() {
+    TLineageGraph graph = new TLineageGraph();
+    if (Strings.isNullOrEmpty(queryStr_)) return graph;
+    graph.setQuery_text(queryStr_);
+    graph.setHash(getQueryHash(queryStr_));
+    graph.setUser(user_);
+    graph.setStarted(timestamp_);
+    // Add edges
+    List<TMultiEdge> edges = Lists.newArrayList();
+    for (MultiEdge edge: edges_) {
+      edges.add(edge.toThrift());
+    }
+    graph.setEdges(edges);
+    // Add vertices
+    TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
+    List<TVertex> vertices = Lists.newArrayList();
+    for (Vertex vertex: sortedVertices) {
+      vertices.add(vertex.toThrift());
+    }
+    graph.setVertices(vertices);
+    return graph;
+  }
+
+  /**
+   * Creates a LineageGraph object from a thrift object
+   */
+  public static ColumnLineageGraph fromThrift(TLineageGraph obj) {
+    ColumnLineageGraph lineage =
+        new ColumnLineageGraph(obj.query_text, obj.user, obj.started);
+    TreeSet<Vertex> vertices = Sets.newTreeSet();
+    for (TVertex vertex: obj.vertices) {
+      vertices.add(Vertex.fromThrift(vertex));
+    }
+    lineage.setVertices(vertices);
+    for (TMultiEdge edge: obj.edges) {
+      MultiEdge e = MultiEdge.fromThrift(edge);
+      lineage.edges_.add(e);
+    }
+    return lineage;
+  }
+
+  private String getQueryHash(String queryStr) {
+    Hasher hasher = Hashing.md5().newHasher();
+    hasher.putString(queryStr);
+    return hasher.hash().toString();
+  }
+
+  /**
+   * Creates a ColumnLineageGraph object from a serialized JSON record. The new
+   * ColumnLineageGraph object is returned. Used only during testing.
+   */
+  public static ColumnLineageGraph createFromJSON(String json) {
+    if (json == null || json.isEmpty()) return null;
+    JSONParser parser = new JSONParser();
+    Object obj = null;
+    try {
+      obj = parser.parse(json);
+    } catch (ParseException e) {
+      LOG.error("Error parsing serialized column lineage graph: " + 
e.getMessage());
+      return null;
+    }
+    if (!(obj instanceof JSONObject)) return null;
+    JSONObject jsonObj = (JSONObject) obj;
+    String stmt = (String) jsonObj.get("queryText");
+    String hash = (String) jsonObj.get("hash");
+    String user = (String) jsonObj.get("user");
+    long timestamp = (Long) jsonObj.get("timestamp");
+    ColumnLineageGraph graph = new ColumnLineageGraph(stmt, user, timestamp);
+    JSONArray serializedVertices = (JSONArray) jsonObj.get("vertices");
+    Set<Vertex> vertices = Sets.newHashSet();
+    for (int i = 0; i < serializedVertices.size(); ++i) {
+      Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i));
+      vertices.add(v);
+    }
+    graph.setVertices(vertices);
+    JSONArray serializedEdges = (JSONArray) jsonObj.get("edges");
+    for (int i = 0; i < serializedEdges.size(); ++i) {
+      MultiEdge e =
+          graph.createMultiEdgeFromJSONObj((JSONObject) 
serializedEdges.get(i));
+      graph.edges_.add(e);
+    }
+    return graph;
+  }
+
+  private MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge) {
+    Preconditions.checkNotNull(jsonEdge);
+    JSONArray sources = (JSONArray) jsonEdge.get("sources");
+    Set<Vertex> sourceVertices = getVerticesFromJSONArray(sources);
+    JSONArray targets = (JSONArray) jsonEdge.get("targets");
+    Set<Vertex> targetVertices = getVerticesFromJSONArray(targets);
+    MultiEdge.EdgeType type =
+        MultiEdge.EdgeType.valueOf((String) jsonEdge.get("edgeType"));
+    return new MultiEdge(sourceVertices, targetVertices, type);
+  }
+
+  private Set<Vertex> getVerticesFromJSONArray(JSONArray vertexIdArray) {
+    Set<Vertex> vertices = Sets.newHashSet();
+    for (int i = 0; i < vertexIdArray.size(); ++i) {
+      int sourceId = ((Long) vertexIdArray.get(i)).intValue();
+      Vertex sourceVertex = idToVertexMap_.get(new VertexId(sourceId));
+      Preconditions.checkNotNull(sourceVertex);
+      vertices.add(sourceVertex);
+    }
+    return vertices;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (obj.getClass() != this.getClass()) return false;
+    ColumnLineageGraph g = (ColumnLineageGraph) obj;
+    if (!this.vertices_.equals(g.vertices_) ||
+        !this.edges_.equals(g.edges_)) {
+      return false;
+    }
+    return true;
+  }
+
+  public String debugString() {
+    StringBuilder builder = new StringBuilder();
+    for (MultiEdge edge: edges_) {
+      builder.append(edge.toString() + "\n");
+    }
+    builder.append(toJson());
+    return builder.toString();
+  }
+
+  public void addTargetColumnLabels(Collection<String> columnLabels) {
+    Preconditions.checkNotNull(columnLabels);
+    targetColumnLabels_.addAll(columnLabels);
+  }
+
+  public void addTargetColumnLabels(Table dstTable) {
+    Preconditions.checkNotNull(dstTable);
+    String tblFullName = dstTable.getFullName();
+    for (String columnName: dstTable.getColumnNames()) {
+      targetColumnLabels_.add(tblFullName + "." + columnName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java 
b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
new file mode 100644
index 0000000..4869004
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.Function.CompareMode;
+import com.cloudera.impala.catalog.ScalarFunction;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TExprNode;
+import com.cloudera.impala.thrift.TExprNodeType;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * &&, ||, ! predicates.
+ *
+ */
+public class CompoundPredicate extends Predicate {
+  public enum Operator {
+    AND("AND"),
+    OR("OR"),
+    NOT("NOT");
+
+    private final String description;
+
+    private Operator(String description) {
+      this.description = description;
+    }
+
+    @Override
+    public String toString() {
+      return description;
+    }
+  }
+  private final Operator op_;
+
+  public static void initBuiltins(Db db) {
+    // AND and OR are implemented as custom exprs, so they do not have a 
function symbol.
+    db.addBuiltin(ScalarFunction.createBuiltinOperator(
+        Operator.AND.name(), "",
+        Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN));
+    db.addBuiltin(ScalarFunction.createBuiltinOperator(
+        Operator.OR.name(), "",
+        Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN));
+    db.addBuiltin(ScalarFunction.createBuiltinOperator(
+        Operator.NOT.name(), "impala::CompoundPredicate::Not",
+        Lists.<Type>newArrayList(Type.BOOLEAN), Type.BOOLEAN));
+  }
+
+  public CompoundPredicate(Operator op, Expr e1, Expr e2) {
+    super();
+    this.op_ = op;
+    Preconditions.checkNotNull(e1);
+    children_.add(e1);
+    Preconditions.checkArgument(op == Operator.NOT && e2 == null
+        || op != Operator.NOT && e2 != null);
+    if (e2 != null) children_.add(e2);
+  }
+
+  /**
+   * Copy c'tor used in clone().
+   */
+  protected CompoundPredicate(CompoundPredicate other) {
+    super(other);
+    op_ = other.op_;
+  }
+
+  public Operator getOp() { return op_; }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!super.equals(obj)) return false;
+    return ((CompoundPredicate) obj).op_ == op_;
+  }
+
+  @Override
+  public String debugString() {
+    return Objects.toStringHelper(this)
+        .add("op", op_)
+        .addValue(super.debugString())
+        .toString();
+  }
+
+  @Override
+  public String toSqlImpl() {
+    if (children_.size() == 1) {
+      Preconditions.checkState(op_ == Operator.NOT);
+      return "NOT " + getChild(0).toSql();
+    } else {
+      return getChild(0).toSql() + " " + op_.toString() + " " + 
getChild(1).toSql();
+    }
+  }
+
+  @Override
+  protected void toThrift(TExprNode msg) {
+    msg.node_type = TExprNodeType.COMPOUND_PRED;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (isAnalyzed_) return;
+    super.analyze(analyzer);
+
+    // Check that children are predicates.
+    for (Expr e: children_) {
+      if (!e.getType().isBoolean() && !e.getType().isNull()) {
+        throw new AnalysisException(String.format("Operand '%s' part of 
predicate " +
+            "'%s' should return type 'BOOLEAN' but returns type '%s'.",
+            e.toSql(), toSql(), e.getType().toSql()));
+      }
+    }
+
+    fn_ = getBuiltinFunction(analyzer, op_.toString(), 
collectChildReturnTypes(),
+        CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
+    Preconditions.checkState(fn_ != null);
+    Preconditions.checkState(fn_.getReturnType().isBoolean());
+    castForFunctionCall(false);
+    if (hasChildCosts()) evalCost_ = getChildCosts() + COMPOUND_PREDICATE_COST;
+
+    if (!getChild(0).hasSelectivity() ||
+        (children_.size() == 2 && !getChild(1).hasSelectivity())) {
+      // Give up if one of our children has an unknown selectivity.
+      selectivity_ = -1;
+      return;
+    }
+
+    switch (op_) {
+      case AND:
+        selectivity_ = getChild(0).selectivity_ * getChild(1).selectivity_;
+        break;
+      case OR:
+        selectivity_ = getChild(0).selectivity_ + getChild(1).selectivity_
+            - getChild(0).selectivity_ * getChild(1).selectivity_;
+        break;
+      case NOT:
+        selectivity_ = 1.0 - getChild(0).selectivity_;
+        break;
+    }
+    selectivity_ = Math.max(0.0, Math.min(1.0, selectivity_));
+  }
+
+  /**
+   * Retrieve the slots bound by BinaryPredicate, InPredicate and
+   * CompoundPredicates in the subtree rooted at 'this'.
+   */
+  public ArrayList<SlotRef> getBoundSlots() {
+    ArrayList<SlotRef> slots = Lists.newArrayList();
+    for (int i = 0; i < getChildren().size(); ++i) {
+      if (getChild(i) instanceof BinaryPredicate ||
+          getChild(i) instanceof InPredicate) {
+        slots.add(((Predicate)getChild(i)).getBoundSlot());
+      } else if (getChild(i) instanceof CompoundPredicate) {
+        slots.addAll(((CompoundPredicate)getChild(i)).getBoundSlots());
+      }
+    }
+    return slots;
+  }
+
+  /**
+   * Negates a CompoundPredicate.
+   */
+  @Override
+  public Expr negate() {
+    if (op_ == Operator.NOT) return getChild(0);
+    Expr negatedLeft = getChild(0).negate();
+    Expr negatedRight = getChild(1).negate();
+    Operator newOp = (op_ == Operator.OR) ? Operator.AND : Operator.OR;
+    return new CompoundPredicate(newOp, negatedLeft, negatedRight);
+  }
+
+  /**
+   * Creates a conjunctive predicate from a list of exprs.
+   */
+  public static Expr createConjunctivePredicate(List<Expr> conjuncts) {
+    Expr conjunctivePred = null;
+    for (Expr expr: conjuncts) {
+      if (conjunctivePred == null) {
+        conjunctivePred = expr;
+        continue;
+      }
+      conjunctivePred = new CompoundPredicate(CompoundPredicate.Operator.AND,
+          expr, conjunctivePred);
+    }
+    return conjunctivePred;
+  }
+
+  @Override
+  public Expr clone() { return new CompoundPredicate(this); }
+
+  // Create an AND predicate between two exprs, 'lhs' and 'rhs'. If
+  // 'rhs' is null, simply return 'lhs'.
+  public static Expr createConjunction(Expr lhs, Expr rhs) {
+    if (rhs == null) return lhs;
+    return new CompoundPredicate(Operator.AND, rhs, lhs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
new file mode 100644
index 0000000..cd01713
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -0,0 +1,553 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.log4j.Logger;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.Column;
+import com.cloudera.impala.catalog.HBaseTable;
+import com.cloudera.impala.catalog.HdfsPartition;
+import com.cloudera.impala.catalog.HdfsTable;
+import com.cloudera.impala.catalog.Table;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.catalog.View;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.PrintUtils;
+import com.cloudera.impala.thrift.TComputeStatsParams;
+import com.cloudera.impala.thrift.TPartitionStats;
+import com.cloudera.impala.thrift.TTableName;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents a COMPUTE STATS <table> and COMPUTE INCREMENTAL STATS <table> 
[PARTITION
+ * <part_spec>] statement for statistics collection. The former statement 
gathers all
+ * table and column stats for a given table and stores them in the Metastore 
via the
+ * CatalogService. All existing stats for that table are replaced and no 
existing stats
+ * are reused. The latter, incremental form, similarly computes stats for the 
whole table
+ * but does so by re-using stats from partitions which have 'valid' 
statistics. Statistics
+ * are 'valid' currently if they exist, in the future they may be expired 
based on recency
+ * etc.
+ *
+ * TODO: Allow more coarse/fine grained (db, column)
+ * TODO: Compute stats on complex types.
+ */
+public class ComputeStatsStmt extends StatementBase {
+  private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class);
+
+  private static String AVRO_SCHEMA_MSG_PREFIX = "Cannot COMPUTE STATS on Avro 
table " +
+      "'%s' because its column definitions do not match those in the Avro 
schema.";
+  private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table 
with " +
+          "column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
+
+  protected final TableName tableName_;
+
+  // Set during analysis.
+  protected Table table_;
+
+  // The Null count is not currently being used in optimization or run-time,
+  // and compute stats runs 2x faster in many cases when not counting NULLs.
+  private static final boolean COUNT_NULLS = false;
+
+  // Query for getting the per-partition row count and the total row count.
+  // Set during analysis.
+  protected String tableStatsQueryStr_;
+
+  // Query for getting the per-column NDVs and number of NULLs.
+  // Set during analysis.
+  protected String columnStatsQueryStr_;
+
+  // If true, stats will be gathered incrementally per-partition.
+  private boolean isIncremental_ = false;
+
+  // If true, expect the compute stats process to produce output for all 
partitions in the
+  // target table (only meaningful, therefore, if partitioned). This is always 
true for
+  // non-incremental computations. If set, expectedPartitions_ will be empty - 
the point
+  // of this flag is to optimise the case where all partitions are targeted.
+  private boolean expectAllPartitions_ = false;
+
+  // The list of valid partition statistics that can be used in an incremental 
computation
+  // without themselves being recomputed. Populated in analyze().
+  private final List<TPartitionStats> validPartStats_ = Lists.newArrayList();
+
+  // For incremental computations, the list of partitions (identified by list 
of partition
+  // column values) that we expect to receive results for. Used to ensure that 
even empty
+  // partitions emit results.
+  // TODO: Consider using partition IDs (and adding them to the child queries 
with a
+  // PARTITION_ID() builtin)
+  private final List<List<String>> expectedPartitions_ = Lists.newArrayList();
+
+  // If non-null, the partition that an incremental computation might apply 
to. Must be
+  // null if this is a non-incremental computation.
+  private PartitionSpec partitionSpec_ = null;
+
+  // The maximum number of partitions that may be explicitly selected by filter
+  // predicates. Any query that selects more than this automatically drops 
back to a full
+  // incremental stats recomputation.
+  // TODO: We can probably do better than this, e.g. running several queries, 
each of
+  // which selects up to MAX_INCREMENTAL_PARTITIONS partitions.
+  private static final int MAX_INCREMENTAL_PARTITIONS = 1000;
+
+  /**
+   * Constructor for the non-incremental form of COMPUTE STATS.
+   */
+  protected ComputeStatsStmt(TableName tableName) {
+    this(tableName, false, null);
+  }
+
+  /**
+   * Constructor for the incremental form of COMPUTE STATS. If isIncremental 
is true,
+   * statistics will be recomputed incrementally; if false they will be 
recomputed for the
+   * whole table. The partition spec partSpec can specify a single partition 
whose stats
+   * should be recomputed.
+   */
+  protected ComputeStatsStmt(TableName tableName, boolean isIncremental,
+      PartitionSpec partSpec) {
+    Preconditions.checkState(tableName != null && !tableName.isEmpty());
+    Preconditions.checkState(isIncremental || partSpec == null);
+    this.tableName_ = tableName;
+    this.table_ = null;
+    this.isIncremental_ = isIncremental;
+    this.partitionSpec_ = partSpec;
+    if (partitionSpec_ != null) {
+      partitionSpec_.setTableName(tableName);
+      partitionSpec_.setPrivilegeRequirement(Privilege.ALTER);
+    }
+  }
+
+  /**
+   * Utility method for constructing the child queries to add partition 
columns to both a
+   * select list and a group-by list; the former are wrapped in a cast to a 
string.
+   */
+  private void addPartitionCols(HdfsTable table, List<String> selectList,
+      List<String> groupByCols) {
+    for (int i = 0; i < table.getNumClusteringCols(); ++i) {
+      String colRefSql = 
ToSqlUtils.getIdentSql(table.getColumns().get(i).getName());
+      groupByCols.add(colRefSql);
+      // For the select list, wrap the group by columns in a cast to string 
because
+      // the Metastore stores them as strings.
+      selectList.add(colRefSql);
+    }
+  }
+
+  private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) {
+    List<String> columnStatsSelectList = Lists.newArrayList();
+    // For Hdfs tables, exclude partition columns from stats gathering because 
Hive
+    // cannot store them as part of the non-partition column stats. For HBase 
tables,
+    // include the single clustering column (the row key).
+    int startColIdx = (table_ instanceof HBaseTable) ? 0 : 
table_.getNumClusteringCols();
+    final String ndvUda = isIncremental_ ? "NDV_NO_FINALIZE" : "NDV";
+
+    for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
+      Column c = table_.getColumns().get(i);
+      Type type = c.getType();
+
+      // Ignore columns with an invalid/unsupported type. For example, complex 
types in
+      // an HBase-backed table will appear as invalid types.
+      if (!type.isValid() || !type.isSupported()
+          || c.getType().isComplexType()) {
+        continue;
+      }
+      // NDV approximation function. Add explicit alias for later 
identification when
+      // updating the Metastore.
+      String colRefSql = ToSqlUtils.getIdentSql(c.getName());
+      columnStatsSelectList.add(ndvUda + "(" + colRefSql + ") AS " + 
colRefSql);
+
+      if (COUNT_NULLS) {
+        // Count the number of NULL values.
+        columnStatsSelectList.add("COUNT(IF(" + colRefSql + " IS NULL, 1, 
NULL))");
+      } else {
+        // Using -1 to indicate "unknown". We need cast to BIGINT because 
backend expects
+        // an i64Val as the number of NULLs returned by the COMPUTE STATS 
column stats
+        // child query. See CatalogOpExecutor::SetColumnStats(). If we do not 
cast, then
+        // the -1 will be treated as TINYINT resulting a 0 to be placed in the 
#NULLs
+        // column (see IMPALA-1068).
+        columnStatsSelectList.add("CAST(-1 as BIGINT)");
+      }
+
+      // For STRING columns also compute the max and avg string length.
+      if (type.isStringType()) {
+        columnStatsSelectList.add("MAX(length(" + colRefSql + "))");
+        columnStatsSelectList.add("AVG(length(" + colRefSql + "))");
+      } else {
+        // For non-STRING columns we use the fixed size of the type.
+        // We store the same information for all types to avoid having to
+        // treat STRING columns specially in the BE CatalogOpExecutor.
+        Integer typeSize = type.getPrimitiveType().getSlotSize();
+        columnStatsSelectList.add(typeSize.toString());
+        columnStatsSelectList.add("CAST(" + typeSize.toString() + " as 
DOUBLE)");
+      }
+
+      if (isIncremental_) {
+        // Need the count in order to properly combine per-partition column 
stats
+        columnStatsSelectList.add("COUNT(" + colRefSql + ")");
+      }
+    }
+    return columnStatsSelectList;
+  }
+
+  /**
+   * Constructs two queries to compute statistics for 'tableName_', if that 
table exists
+   * (although if we can detect that no work needs to be done for either 
query, that query
+   * will be 'null' and not executed).
+   *
+   * The first query computes the number of rows (on a per-partition basis if 
the table is
+   * partitioned) and has the form "SELECT COUNT(*) FROM tbl GROUP BY 
part_col1,
+   * part_col2...", with an optional WHERE clause for incremental computation 
(see below).
+   *
+   * The second query computes the NDV estimate, the average width, the 
maximum width and,
+   * optionally, the number of nulls for each column. For non-partitioned 
tables (or
+   * non-incremental computations), the query is simple:
+   *
+   * SELECT NDV(col), COUNT(<nulls>), MAX(length(col)), AVG(length(col)) FROM 
tbl
+   *
+   * (For non-string columns, the widths are hard-coded as they are known at 
query
+   * construction time).
+   *
+   * If computation is incremental (i.e. the original statement was COMPUTE 
INCREMENTAL
+   * STATS.., and the underlying table is a partitioned HdfsTable), some 
modifications are
+   * made to the non-incremental per-column query. First, a different UDA,
+   * NDV_NO_FINALIZE() is used to retrieve and serialise the intermediate 
state from each
+   * column. Second, the results are grouped by partition, as with the row 
count query, so
+   * that the intermediate NDV computation state can be stored per-partition. 
The number
+   * of rows per-partition are also recorded.
+   *
+   * For both the row count query, and the column stats query, the query's 
WHERE clause is
+   * used to restrict execution only to partitions that actually require new 
statistics to
+   * be computed.
+   *
+   * SELECT NDV_NO_FINALIZE(col), <nulls, max, avg>, COUNT(col) FROM tbl
+   * GROUP BY part_col1, part_col2, ...
+   * WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR
+   *       ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...
+   */
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    table_ = analyzer.getTable(tableName_, Privilege.ALTER);
+    String sqlTableName = table_.getTableName().toSql();
+    if (table_ instanceof View) {
+      throw new AnalysisException(String.format(
+          "COMPUTE STATS not supported for view %s", sqlTableName));
+    }
+
+    if (!(table_ instanceof HdfsTable)) {
+      if (partitionSpec_ != null) {
+        throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not 
supported " +
+            "for non-HDFS table " + table_.getTableName());
+      }
+      isIncremental_ = false;
+    }
+
+    // Ensure that we write an entry for every partition if this isn't 
incremental
+    if (!isIncremental_) expectAllPartitions_ = true;
+
+    HdfsTable hdfsTable = null;
+    if (table_ instanceof HdfsTable) {
+      hdfsTable = (HdfsTable)table_;
+      if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
+          partitionSpec_ != null) {
+          throw new AnalysisException(String.format(
+              "Can't compute PARTITION stats on an unpartitioned table: %s",
+              sqlTableName));
+      } else if (partitionSpec_ != null) {
+          partitionSpec_.setPartitionShouldExist();
+          partitionSpec_.analyze(analyzer);
+          for (PartitionKeyValue kv: 
partitionSpec_.getPartitionSpecKeyValues()) {
+            // TODO: We could match the dynamic keys (i.e. as wildcards) as 
well, but that
+            // would involve looping over all partitions and seeing which 
match the
+            // partition spec.
+            if (!kv.isStatic()) {
+              throw new AnalysisException("All partition keys must have 
values: " +
+                  kv.toString());
+            }
+          }
+      }
+      // For incremental stats, estimate the size of intermediate stats and 
report an
+      // error if the estimate is greater than 
MAX_INCREMENTAL_STATS_SIZE_BYTES.
+      if (isIncremental_) {
+        long statsSizeEstimate = hdfsTable.getColumns().size() *
+            hdfsTable.getPartitions().size() * 
HdfsTable.STATS_SIZE_PER_COLUMN_BYTES;
+        if (statsSizeEstimate > HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES) {
+          LOG.error("Incremental stats size estimate for table " + 
hdfsTable.getName() +
+              " exceeded " + HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES + ", 
estimate = "
+              + statsSizeEstimate);
+          throw new AnalysisException("Incremental stats size estimate exceeds 
"
+              + 
PrintUtils.printBytes(HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES)
+              + ". Please try COMPUTE STATS instead.");
+        }
+      }
+    }
+
+    // Build partition filters that only select partitions without valid 
statistics for
+    // incremental computation.
+    List<String> filterPreds = Lists.newArrayList();
+    if (isIncremental_) {
+      if (partitionSpec_ == null) {
+        // If any column does not have stats, we recompute statistics for all 
partitions
+        // TODO: need a better way to invalidate stats for all partitions, so 
that we can
+        // use this logic to only recompute new / changed columns.
+        boolean tableIsMissingColStats = false;
+
+        // We'll warn the user if a column is missing stats (and therefore we 
rescan the
+        // whole table), but if all columns are missing stats, the table just 
doesn't have
+        // any stats and there's no need to warn.
+        boolean allColumnsMissingStats = true;
+        String exampleColumnMissingStats = null;
+        // Partition columns always have stats, so exclude them from this 
search
+        for (Column col: table_.getNonClusteringColumns()) {
+          if (!col.getStats().hasStats()) {
+            if (!tableIsMissingColStats) {
+              tableIsMissingColStats = true;
+              exampleColumnMissingStats = col.getName();
+            }
+          } else {
+            allColumnsMissingStats = false;
+          }
+        }
+
+        if (tableIsMissingColStats && !allColumnsMissingStats) {
+          analyzer.addWarning("Column " + exampleColumnMissingStats +
+              " does not have statistics, recomputing stats for the whole 
table");
+        }
+
+        for (HdfsPartition p: hdfsTable.getPartitions()) {
+          if (p.isDefaultPartition()) continue;
+          TPartitionStats partStats = p.getPartitionStats();
+          if (!p.hasIncrementalStats() || tableIsMissingColStats) {
+            if (partStats == null) LOG.trace(p.toString() + " does not have 
stats");
+            if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql());
+            List<String> partValues = Lists.newArrayList();
+            for (LiteralExpr partValue: p.getPartitionValues()) {
+              
partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue,
+                  "NULL"));
+            }
+            expectedPartitions_.add(partValues);
+          } else {
+            LOG.trace(p.toString() + " does have statistics");
+            validPartStats_.add(partStats);
+          }
+        }
+        if (expectedPartitions_.size() == hdfsTable.getPartitions().size() - 
1) {
+          expectedPartitions_.clear();
+          expectAllPartitions_ = true;
+        }
+      } else {
+        // Always compute stats on a particular partition when told to.
+        List<String> partitionConjuncts = Lists.newArrayList();
+        for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) 
{
+          partitionConjuncts.add(kv.toPredicateSql());
+        }
+        filterPreds.add("(" + Joiner.on(" AND ").join(partitionConjuncts) + 
")");
+        HdfsPartition targetPartition =
+            hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues());
+        List<String> partValues = Lists.newArrayList();
+        for (LiteralExpr partValue: targetPartition.getPartitionValues()) {
+          
partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue,
+              "NULL"));
+        }
+        expectedPartitions_.add(partValues);
+        for (HdfsPartition p: hdfsTable.getPartitions()) {
+          if (p.isDefaultPartition()) continue;
+          if (p == targetPartition) continue;
+          TPartitionStats partStats = p.getPartitionStats();
+          if (partStats != null) validPartStats_.add(partStats);
+        }
+      }
+
+      if (filterPreds.size() == 0 && validPartStats_.size() != 0) {
+        LOG.info("No partitions selected for incremental stats update");
+        analyzer.addWarning("No partitions selected for incremental stats 
update");
+        return;
+      }
+    }
+
+    if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) {
+      // TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS 
partitions, and then
+      // advising the user to iterate.
+      analyzer.addWarning(
+          "Too many partitions selected, doing full recomputation of 
incremental stats");
+      filterPreds.clear();
+      validPartStats_.clear();
+    }
+
+    List<String> groupByCols = Lists.newArrayList();
+    List<String> partitionColsSelectList = Lists.newArrayList();
+    // Only add group by clause for HdfsTables.
+    if (hdfsTable != null) {
+      if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
+      addPartitionCols(hdfsTable, partitionColsSelectList, groupByCols);
+    }
+
+    // Query for getting the per-partition row count and the total row count.
+    StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT ");
+    List<String> tableStatsSelectList = Lists.newArrayList();
+    tableStatsSelectList.add("COUNT(*)");
+
+    tableStatsSelectList.addAll(partitionColsSelectList);
+    tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList));
+    tableStatsQueryBuilder.append(" FROM " + sqlTableName);
+
+    // Query for getting the per-column NDVs and number of NULLs.
+    List<String> columnStatsSelectList = 
getBaseColumnStatsQuerySelectList(analyzer);
+
+    if (isIncremental_) columnStatsSelectList.addAll(partitionColsSelectList);
+
+    StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT ");
+    columnStatsQueryBuilder.append(Joiner.on(", 
").join(columnStatsSelectList));
+    columnStatsQueryBuilder.append(" FROM " + sqlTableName);
+
+    // Add the WHERE clause to filter out partitions that we don't want to 
compute
+    // incremental stats for. While this is a win in most situations, we would 
like to
+    // avoid this where it does no useful work (i.e. it selects all rows). 
This happens
+    // when there are no existing valid partitions (so all partitions will 
have been
+    // selected in) and there is no partition spec (so no single partition was 
explicitly
+    // selected in).
+    if (filterPreds.size() > 0 &&
+        (validPartStats_.size() > 0 || partitionSpec_ != null)) {
+      String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds);
+      columnStatsQueryBuilder.append(filterClause);
+      tableStatsQueryBuilder.append(filterClause);
+    }
+
+    if (groupByCols.size() > 0) {
+      String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols);
+      if (isIncremental_) columnStatsQueryBuilder.append(groupBy);
+      tableStatsQueryBuilder.append(groupBy);
+    }
+
+    tableStatsQueryStr_ = tableStatsQueryBuilder.toString();
+    LOG.debug("Table stats query: " + tableStatsQueryStr_);
+
+    if (columnStatsSelectList.isEmpty()) {
+      // Table doesn't have any columns that we can compute stats for.
+      LOG.info("No supported column types in table " + table_.getTableName() +
+          ", no column statistics will be gathered.");
+      columnStatsQueryStr_ = null;
+      return;
+    }
+
+    columnStatsQueryStr_ = columnStatsQueryBuilder.toString();
+    LOG.debug("Column stats query: " + columnStatsQueryStr_);
+  }
+
+  /**
+   * Checks whether the column definitions from the CREATE TABLE stmt match 
the columns
+   * in the Avro schema. If there is a mismatch, then COMPUTE STATS cannot 
update the
+   * statistics in the Metastore's backend DB due to HIVE-6308. Throws an
+   * AnalysisException for such ill-created Avro tables. Does nothing if
+   * the column definitions match the Avro schema exactly.
+   */
+  private void checkIncompleteAvroSchema(HdfsTable table) throws 
AnalysisException {
+    Preconditions.checkState(table.isAvroTable());
+    org.apache.hadoop.hive.metastore.api.Table msTable = 
table.getMetaStoreTable();
+    // The column definitions from 'CREATE TABLE (column definitions) ...'
+    Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator();
+    // The columns derived from the Avro schema file or literal schema.
+    // Inconsistencies between the Avro-schema columns and the column 
definitions
+    // are sometimes resolved in the CREATE TABLE, and sometimes not (see 
below).
+    Iterator<Column> avroSchemaCols = table.getColumns().iterator();
+    // Skip partition columns from 'table' since those are not present in
+    // the msTable field schemas.
+    for (int i = 0; i < table.getNumClusteringCols(); ++i) {
+      if (avroSchemaCols.hasNext()) avroSchemaCols.next();
+    }
+    int pos = 0;
+    while (colDefs.hasNext() || avroSchemaCols.hasNext()) {
+      if (colDefs.hasNext() && avroSchemaCols.hasNext()) {
+        FieldSchema colDef = colDefs.next();
+        Column avroSchemaCol = avroSchemaCols.next();
+        // Check that the column names are identical. Ignore mismatched types
+        // as those will either fail in the scan or succeed.
+        if (!colDef.getName().equalsIgnoreCase(avroSchemaCol.getName())) {
+          throw new AnalysisException(
+              String.format(AVRO_SCHEMA_MSG_PREFIX +
+                  "\nDefinition of column '%s' of type '%s' does not match " +
+                  "the Avro-schema column '%s' of type '%s' at position 
'%s'.\n" +
+                  AVRO_SCHEMA_MSG_SUFFIX,
+                  table.getName(), colDef.getName(), colDef.getType(),
+                  avroSchemaCol.getName(), avroSchemaCol.getType(), pos));
+        }
+      }
+      // The following two cases are typically not possible because Hive 
resolves
+      // inconsistencies between the column-definition list and the Avro 
schema if a
+      // column-definition list was given in the CREATE TABLE (having no column
+      // definitions at all results in HIVE-6308). Even so, we check these 
cases for
+      // extra safety. COMPUTE STATS could be made to succeed in special 
instances of
+      // the cases below but we chose to throw an AnalysisException to avoid 
confusion
+      // because this scenario "should" never arise as mentioned above.
+      if (colDefs.hasNext() && !avroSchemaCols.hasNext()) {
+        FieldSchema colDef = colDefs.next();
+        throw new AnalysisException(
+            String.format(AVRO_SCHEMA_MSG_PREFIX +
+                "\nMissing Avro-schema column corresponding to column " +
+                "definition '%s' of type '%s' at position '%s'.\n" +
+                AVRO_SCHEMA_MSG_SUFFIX,
+                table.getName(), colDef.getName(), colDef.getType(), pos));
+      }
+      if (!colDefs.hasNext() && avroSchemaCols.hasNext()) {
+        Column avroSchemaCol = avroSchemaCols.next();
+        throw new AnalysisException(
+            String.format(AVRO_SCHEMA_MSG_PREFIX +
+                "\nMissing column definition corresponding to Avro-schema " +
+                "column '%s' of type '%s' at position '%s'.\n" +
+                AVRO_SCHEMA_MSG_SUFFIX,
+                table.getName(), avroSchemaCol.getName(), 
avroSchemaCol.getType(), pos));
+      }
+      ++pos;
+    }
+  }
+
+  public String getTblStatsQuery() { return tableStatsQueryStr_; }
+  public String getColStatsQuery() { return columnStatsQueryStr_; }
+
+  @Override
+  public String toSql() {
+    if (!isIncremental_) {
+      return "COMPUTE STATS " + tableName_.toSql();
+    } else {
+      return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() +
+          partitionSpec_ == null ? "" : partitionSpec_.toSql();
+    }
+  }
+
+  public TComputeStatsParams toThrift() {
+    TComputeStatsParams params = new TComputeStatsParams();
+    params.setTable_name(new TTableName(table_.getDb().getName(), 
table_.getName()));
+    params.setTbl_stats_query(tableStatsQueryStr_);
+    if (columnStatsQueryStr_ != null) {
+      params.setCol_stats_query(columnStatsQueryStr_);
+    } else {
+      params.setCol_stats_queryIsSet(false);
+    }
+
+    params.setIs_incremental(isIncremental_);
+    params.setExisting_part_stats(validPartStats_);
+    params.setExpect_all_partitions(expectAllPartitions_);
+    if (!expectAllPartitions_) 
params.setExpected_partitions(expectedPartitions_);
+    if (isIncremental_) {
+      params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols());
+    }
+    return params;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java
new file mode 100644
index 0000000..1ee6fd4
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateDataSrcStmt.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.extdatasource.ApiVersion;
+import com.cloudera.impala.thrift.TCreateDataSourceParams;
+import com.cloudera.impala.thrift.TDataSource;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE DATA SOURCE statement.
+ */
+public class CreateDataSrcStmt extends StatementBase {
+  private final String dataSrcName_;
+  private final String className_;
+  private final String apiVersionString_;
+  private final HdfsUri location_;
+  private final boolean ifNotExists_;
+  private ApiVersion apiVersion_;
+
+  public CreateDataSrcStmt(String dataSrcName, HdfsUri location, String 
className,
+      String apiVersionString, boolean ifNotExists) {
+    Preconditions.checkNotNull(dataSrcName);
+    Preconditions.checkNotNull(className);
+    Preconditions.checkNotNull(apiVersionString);
+    Preconditions.checkNotNull(location);
+    dataSrcName_ = dataSrcName.toLowerCase();
+    location_ = location;
+    className_ = className;
+    apiVersionString_ = apiVersionString;
+    ifNotExists_ = ifNotExists;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (!MetaStoreUtils.validateName(dataSrcName_)) {
+      throw new AnalysisException("Invalid data source name: " + dataSrcName_);
+    }
+    if (!ifNotExists_ && analyzer.getCatalog().getDataSource(dataSrcName_) != 
null) {
+      throw new AnalysisException(Analyzer.DATA_SRC_ALREADY_EXISTS_ERROR_MSG +
+          dataSrcName_);
+    }
+
+    apiVersion_ = ApiVersion.parseApiVersion(apiVersionString_);
+    if (apiVersion_ == null) {
+      throw new AnalysisException("Invalid API version: '" + apiVersionString_ 
+
+          "'. Valid API versions: " + Joiner.on(", 
").join(ApiVersion.values()));
+    }
+
+    location_.analyze(analyzer, Privilege.ALL, FsAction.READ);
+    // TODO: Check class exists and implements API version
+    // TODO: authorization check
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE DATA SOURCE ");
+    if (ifNotExists_) sb.append("IF NOT EXISTS ");
+    sb.append(dataSrcName_);
+    sb.append(" LOCATION '");
+    sb.append(location_.getLocation());
+    sb.append("' CLASS '");
+    sb.append(className_);
+    sb.append("' API_VERSION '");
+    sb.append(apiVersion_.name());
+    sb.append("'");
+    return sb.toString();
+  }
+
+  public TCreateDataSourceParams toThrift() {
+    return new TCreateDataSourceParams(
+        new TDataSource(dataSrcName_, location_.toString(), className_,
+            apiVersion_.name())).setIf_not_exists(ifNotExists_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java
new file mode 100644
index 0000000..3dedd8b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TCreateDbParams;
+
+/**
+ * Represents a CREATE DATABASE statement
+ */
+public class CreateDbStmt extends StatementBase {
+  private final String dbName_;
+  private final HdfsUri location_;
+  private final String comment_;
+  private final boolean ifNotExists_;
+
+  /**
+   * Creates a database with the given name.
+   */
+  public CreateDbStmt(String dbName) {
+    this(dbName, null, null, false);
+  }
+
+  /**
+   * Creates a database with the given name, comment, and HDFS table storage 
location.
+   * New tables created in the database inherit the location property for 
their default
+   * storage location. Create database will throw an error if the database 
already exists
+   * unless the ifNotExists is true.
+   */
+  public CreateDbStmt(String dbName, String comment, HdfsUri location,
+      boolean ifNotExists) {
+    this.dbName_ = dbName;
+    this.comment_ = comment;
+    this.location_ = location;
+    this.ifNotExists_ = ifNotExists;
+  }
+
+  public String getComment() { return comment_; }
+  public String getDb() { return dbName_; }
+  public boolean getIfNotExists() { return ifNotExists_; }
+  public HdfsUri getLocation() { return location_; }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder("CREATE DATABASE");
+    if (ifNotExists_) sb.append(" IF NOT EXISTS");
+    sb.append(dbName_);
+    if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'");
+    if (location_ != null) sb.append(" LOCATION '" + location_ + "'");
+    return sb.toString();
+  }
+
+  public TCreateDbParams toThrift() {
+    TCreateDbParams params = new TCreateDbParams();
+    params.setDb(getDb());
+    params.setComment(getComment());
+    params.setLocation(location_ == null ? null : location_.toString());
+    params.setIf_not_exists(getIfNotExists());
+    return params;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    // Check whether the db name meets the Metastore's requirements.
+    if (!MetaStoreUtils.validateName(dbName_)) {
+      throw new AnalysisException("Invalid database name: " + dbName_);
+    }
+
+    // Note: It is possible that a database with the same name was created 
external to
+    // this Impala instance. If that happens, the caller will not get an
+    // AnalysisException when creating the database, they will get a Hive
+    // AlreadyExistsException once the request has been sent to the metastore.
+    Db db = analyzer.getDb(getDb(), Privilege.CREATE, false);
+    if (db != null && !ifNotExists_) {
+      throw new AnalysisException(Analyzer.DB_ALREADY_EXISTS_ERROR_MSG + 
getDb());
+    }
+
+    if (location_ != null) {
+      location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java
new file mode 100644
index 0000000..ef90b8a
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import com.cloudera.impala.catalog.Role;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TCreateDropRoleParams;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a "CREATE ROLE" or "DROP ROLE" statement.
+ */
+public class CreateDropRoleStmt extends AuthorizationStmt {
+  private final String roleName_;
+  private final boolean isDropRole_;
+
+  // Set in analysis
+  private String user_;
+
+  public CreateDropRoleStmt(String roleName, boolean isDropRole) {
+    Preconditions.checkNotNull(roleName);
+    roleName_ = roleName;
+    isDropRole_ = isDropRole;
+  }
+
+  @Override
+  public String toSql() {
+    return String.format("%s ROLE %s", roleName_, isDropRole_ ? "DROP" : 
"CREATE");
+  }
+
+  public TCreateDropRoleParams toThrift() {
+    TCreateDropRoleParams params = new TCreateDropRoleParams();
+    params.setRole_name(roleName_);
+    params.setIs_drop(isDropRole_);
+    return params;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    Role existingRole = 
analyzer.getCatalog().getAuthPolicy().getRole(roleName_);
+    if (isDropRole_ && existingRole == null) {
+      throw new AnalysisException(String.format("Role '%s' does not exist.", 
roleName_));
+    } else if (!isDropRole_ && existingRole != null) {
+      throw new AnalysisException(String.format("Role '%s' already exists.", 
roleName_));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java
new file mode 100644
index 0000000..ebfd7b6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateFunctionStmtBase.java
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.fs.permission.FsAction;
+
+import com.cloudera.impala.authorization.AuthorizeableFn;
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.authorization.PrivilegeRequest;
+import com.cloudera.impala.catalog.Catalog;
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.Function;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TCreateFunctionParams;
+import com.cloudera.impala.thrift.TFunctionBinaryType;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for CREATE [] FUNCTION.
+ */
+public abstract class CreateFunctionStmtBase extends StatementBase {
+
+  // Enums for valid keys for optional arguments.
+  public enum OptArg {
+    COMMENT,
+    SYMBOL,           // Only used for Udfs
+    PREPARE_FN,       // Only used for Udfs
+    CLOSE_FN,         // Only used for Udfs
+    UPDATE_FN,        // Only used for Udas
+    INIT_FN,          // Only used for Udas
+    SERIALIZE_FN,     // Only used for Udas
+    MERGE_FN,         // Only used for Udas
+    FINALIZE_FN       // Only used for Udas
+  };
+
+  protected final FunctionName fnName_;
+  protected final FunctionArgs args_;
+  protected final TypeDef retTypeDef_;
+  protected final HdfsUri location_;
+  protected final HashMap<CreateFunctionStmtBase.OptArg, String> optArgs_;
+  protected final boolean ifNotExists_;
+
+  // Result of analysis.
+  protected Function fn_;
+
+  // Db object for function fn_. Set in analyze().
+  protected Db db_;
+
+  // Set in analyze()
+  protected String sqlString_;
+
+  protected CreateFunctionStmtBase(FunctionName fnName, FunctionArgs args,
+      TypeDef retTypeDef, HdfsUri location, boolean ifNotExists,
+      HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) {
+    // The return and arg types must either be both null or non-null.
+    Preconditions.checkState(!(args == null ^ retTypeDef == null));
+    fnName_ = fnName;
+    args_ = args;
+    retTypeDef_ = retTypeDef;
+    location_ = location;
+    ifNotExists_ = ifNotExists;
+    optArgs_ = optArgs;
+  }
+
+  public String getComment() { return optArgs_.get(OptArg.COMMENT); }
+  public boolean getIfNotExists() { return ifNotExists_; }
+  public boolean hasSignature() { return args_ != null; }
+
+  public TCreateFunctionParams toThrift() {
+    TCreateFunctionParams params = new TCreateFunctionParams(fn_.toThrift());
+    params.setIf_not_exists(getIfNotExists());
+    params.setFn(fn_.toThrift());
+    return params;
+  }
+
+  // Returns optArg[key], first validating that it is set.
+  protected String checkAndGetOptArg(OptArg key)
+      throws AnalysisException {
+    if (!optArgs_.containsKey(key)) {
+      throw new AnalysisException("Argument '" + key + "' must be set.");
+    }
+    return optArgs_.get(key);
+  }
+
+  protected void checkOptArgNotSet(OptArg key)
+      throws AnalysisException {
+    if (optArgs_.containsKey(key)) {
+      throw new AnalysisException("Optional argument '" + key + "' should not 
be set.");
+    }
+  }
+
+  // Returns the function's binary type based on the path extension.
+  private TFunctionBinaryType getBinaryType() throws AnalysisException {
+    TFunctionBinaryType binaryType = null;
+    String binaryPath = fn_.getLocation().getLocation();
+    int suffixIndex = binaryPath.lastIndexOf(".");
+    if (suffixIndex != -1) {
+      String suffix = binaryPath.substring(suffixIndex + 1);
+      if (suffix.equalsIgnoreCase("jar")) {
+        binaryType = TFunctionBinaryType.JAVA;
+      } else if (suffix.equalsIgnoreCase("so")) {
+        binaryType = TFunctionBinaryType.NATIVE;
+      } else if (suffix.equalsIgnoreCase("ll")) {
+        binaryType = TFunctionBinaryType.IR;
+      }
+    }
+    if (binaryType == null) {
+      throw new AnalysisException("Unknown binary type: '" + binaryPath +
+          "'. Binary must end in .jar, .so or .ll");
+    }
+    return binaryType;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    // Validate function name is legal
+    fnName_.analyze(analyzer);
+
+    if (hasSignature()) {
+      // Validate function arguments and return type.
+      args_.analyze(analyzer);
+      retTypeDef_.analyze(analyzer);
+      fn_ = createFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(),
+          args_.hasVarArgs());
+    } else {
+      fn_ = createFunction(fnName_, null, null, false);
+    }
+
+    // For now, if authorization is enabled, the user needs ALL on the server
+    // to create functions.
+    // TODO: this is not the right granularity but acceptable for now.
+    analyzer.registerPrivReq(new PrivilegeRequest(
+        new AuthorizeableFn(fn_.signatureString()), Privilege.ALL));
+
+    Db builtinsDb = analyzer.getCatalog().getDb(Catalog.BUILTINS_DB);
+    if (builtinsDb.containsFunction(fn_.getName())) {
+      throw new AnalysisException("Function cannot have the same name as a 
builtin: " +
+          fn_.getFunctionName().getFunction());
+    }
+
+    db_ = analyzer.getDb(fn_.dbName(), Privilege.CREATE);
+    Function existingFn = db_.getFunction(fn_, 
Function.CompareMode.IS_INDISTINGUISHABLE);
+    if (existingFn != null && !ifNotExists_) {
+      throw new AnalysisException(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG +
+          existingFn.signatureString());
+    }
+
+    location_.analyze(analyzer, Privilege.CREATE, FsAction.READ);
+    fn_.setLocation(location_);
+
+    // Check the file type from the binary type to infer the type of the UDA
+    fn_.setBinaryType(getBinaryType());
+
+    // Forbid unsupported and complex types.
+    if (hasSignature()) {
+      List<Type> refdTypes = Lists.newArrayList(fn_.getReturnType());
+      refdTypes.addAll(Lists.newArrayList(fn_.getArgs()));
+      for (Type t: refdTypes) {
+        if (!t.isSupported() || t.isComplexType()) {
+          throw new AnalysisException(
+              String.format("Type '%s' is not supported in UDFs/UDAs.", 
t.toSql()));
+        }
+      }
+    } else if (fn_.getBinaryType() != TFunctionBinaryType.JAVA) {
+      throw new AnalysisException(
+          String.format("Native functions require a return type and/or " +
+              "argument types: %s", fn_.getFunctionName()));
+    }
+
+    // Check if the function can be persisted. We persist all native/IR 
functions
+    // and also JAVA functions added without signature. Only JAVA functions 
added
+    // with signatures aren't persisted.
+    if (getBinaryType() == TFunctionBinaryType.JAVA && hasSignature()) {
+      fn_.setIsPersistent(false);
+    } else {
+      fn_.setIsPersistent(true);
+    }
+  }
+
+  /**
+   * Creates a concrete function.
+   */
+  protected abstract Function createFunction(FunctionName fnName,
+      ArrayList<Type> argTypes, Type retType, boolean hasVarArgs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
new file mode 100644
index 0000000..cc04b04
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TCreateOrAlterViewParams;
+import com.cloudera.impala.thrift.TTableName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Base class for CREATE VIEW and ALTER VIEW AS SELECT statements.
+ */
+public abstract class CreateOrAlterViewStmtBase extends StatementBase {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(CreateOrAlterViewStmtBase.class);
+
+  protected final boolean ifNotExists_;
+  protected final TableName tableName_;
+  protected final ArrayList<ColumnDef> columnDefs_;
+  protected final String comment_;
+  protected final QueryStmt viewDefStmt_;
+
+  // Set during analysis
+  protected String dbName_;
+  protected String owner_;
+
+  // The original SQL-string given as view definition. Set during analysis.
+  // Corresponds to Hive's viewOriginalText.
+  protected String originalViewDef_;
+
+  // Query statement (as SQL string) that defines the View for view 
substitution.
+  // It is a transformation of the original view definition, e.g., to enforce 
the
+  // columnDefs even if the original view definition has explicit column 
aliases.
+  // If column definitions were given, then this "expanded" view definition
+  // wraps the original view definition in a select stmt as follows.
+  //
+  // SELECT viewName.origCol1 AS colDesc1, viewName.origCol2 AS colDesc2, ...
+  // FROM (originalViewDef) AS viewName
+  //
+  // Corresponds to Hive's viewExpandedText, but is not identical to the SQL
+  // Hive would produce in view creation.
+  protected String inlineViewDef_;
+
+  // Columns to use in the select list of the expanded SQL string and when 
registering
+  // this view in the metastore. Set in analysis.
+  protected ArrayList<ColumnDef> finalColDefs_;
+
+  public CreateOrAlterViewStmtBase(boolean ifNotExists, TableName tableName,
+      ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) {
+    Preconditions.checkNotNull(tableName);
+    Preconditions.checkNotNull(viewDefStmt);
+    this.ifNotExists_ = ifNotExists;
+    this.tableName_ = tableName;
+    this.columnDefs_ = columnDefs;
+    this.comment_ = comment;
+    this.viewDefStmt_ = viewDefStmt;
+  }
+
+  /**
+   * Sets the originalViewDef and the expanded inlineViewDef based on 
viewDefStmt.
+   * If columnDefs were given, checks that they do not contain duplicate 
column names
+   * and throws an exception if they do.
+   */
+  protected void createColumnAndViewDefs(Analyzer analyzer) throws 
AnalysisException {
+    Preconditions.checkNotNull(dbName_);
+    Preconditions.checkNotNull(owner_);
+
+    // Set the finalColDefs to reflect the given column definitions.
+    if (columnDefs_ != null) {
+      Preconditions.checkState(!columnDefs_.isEmpty());
+      if (columnDefs_.size() != viewDefStmt_.getColLabels().size()) {
+        String cmp =
+            (columnDefs_.size() > viewDefStmt_.getColLabels().size()) ? "more" 
: "fewer";
+        throw new AnalysisException(String.format("Column-definition list has 
" +
+            "%s columns (%s) than the view-definition query statement returns 
(%s).",
+            cmp, columnDefs_.size(), viewDefStmt_.getColLabels().size()));
+      }
+
+      finalColDefs_ = columnDefs_;
+      Preconditions.checkState(
+          columnDefs_.size() == viewDefStmt_.getBaseTblResultExprs().size());
+      for (int i = 0; i < columnDefs_.size(); ++i) {
+        // Set type in the column definition from the view-definition 
statement.
+        
columnDefs_.get(i).setType(viewDefStmt_.getBaseTblResultExprs().get(i).getType());
+      }
+    } else {
+      // Create list of column definitions from the view-definition statement.
+      finalColDefs_ = Lists.newArrayList();
+      List<Expr> exprs = viewDefStmt_.getBaseTblResultExprs();
+      List<String> labels = viewDefStmt_.getColLabels();
+      Preconditions.checkState(exprs.size() == labels.size());
+      for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) {
+        ColumnDef colDef = new ColumnDef(labels.get(i), null, null);
+        colDef.setType(exprs.get(i).getType());
+        finalColDefs_.add(colDef);
+      }
+    }
+
+    // Check that the column definitions have valid names, and that there are 
no
+    // duplicate column names.
+    Set<String> distinctColNames = Sets.newHashSet();
+    for (ColumnDef colDesc: finalColDefs_) {
+      colDesc.analyze();
+      if (!distinctColNames.add(colDesc.getColName().toLowerCase())) {
+        throw new AnalysisException("Duplicate column name: " + 
colDesc.getColName());
+      }
+    }
+
+    // Set original and expanded view-definition SQL strings.
+    originalViewDef_ = viewDefStmt_.toSql();
+
+    // If no column definitions were given, then the expanded view SQL is the 
same
+    // as the original one.
+    if (columnDefs_ == null) {
+      inlineViewDef_ = originalViewDef_;
+      return;
+    }
+
+    // Wrap the original view-definition statement into a SELECT to enforce the
+    // given column definitions.
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    for (int i = 0; i < finalColDefs_.size(); ++i) {
+      String colRef = 
ToSqlUtils.getIdentSql(viewDefStmt_.getColLabels().get(i));
+      String colAlias = 
ToSqlUtils.getIdentSql(finalColDefs_.get(i).getColName());
+      sb.append(String.format("%s.%s AS %s", tableName_.getTbl(), colRef, 
colAlias));
+      sb.append((i+1 != finalColDefs_.size()) ? ", " : "");
+    }
+    // Do not use 'AS' for table aliases because Hive only accepts them 
without 'AS'.
+    sb.append(String.format(" FROM (%s) %s", originalViewDef_, 
tableName_.getTbl()));
+    inlineViewDef_ = sb.toString();
+  }
+
+  /**
+   * Computes the column lineage graph for a create/alter view statetement.
+   */
+  protected void computeLineageGraph(Analyzer analyzer) {
+    ColumnLineageGraph graph = analyzer.getColumnLineageGraph();
+    List<String> colDefs = Lists.newArrayList();
+    for (ColumnDef colDef: finalColDefs_) {
+      colDefs.add(dbName_ + "." + getTbl() + "." + colDef.getColName());
+    }
+    graph.addTargetColumnLabels(colDefs);
+    graph.computeLineageGraph(viewDefStmt_.getResultExprs(), analyzer);
+    LOG.trace("lineage: " + graph.debugString());
+  }
+
+  public TCreateOrAlterViewParams toThrift() {
+    TCreateOrAlterViewParams params = new TCreateOrAlterViewParams();
+    params.setView_name(new TTableName(getDb(), getTbl()));
+    for (ColumnDef col: finalColDefs_) {
+      params.addToColumns(col.toThrift());
+    }
+    params.setOwner(getOwner());
+    params.setIf_not_exists(getIfNotExists());
+    params.setOriginal_view_def(originalViewDef_);
+    params.setExpanded_view_def(inlineViewDef_);
+    if (comment_ != null) params.setComment(comment_);
+    return params;
+  }
+
+  /**
+   * Can only be called after analysis, returns the name of the database the 
table will
+   * be created within.
+   */
+  public String getDb() {
+    Preconditions.checkNotNull(dbName_);
+    return dbName_;
+  }
+
+  /**
+   * Can only be called after analysis, returns the owner of the view to be 
created.
+   */
+  public String getOwner() {
+    Preconditions.checkNotNull(owner_);
+    return owner_;
+  }
+
+  public List<ColumnDef> getColumnDescs() {return columnDefs_; }
+  public String getComment() { return comment_; }
+  public boolean getIfNotExists() { return ifNotExists_; }
+  public String getOriginalViewDef() { return originalViewDef_; }
+  public String getInlineViewDef() { return inlineViewDef_; }
+  public String getTbl() { return tableName_.getTbl(); }
+}


Reply via email to