IMPALA-3725 Support Kudu UPSERT in Impala

This patch introduces a new query statement, UPSERT, for Kudu
tables which operates like an INSERT and uses all of the analysis,
planning, and execution machinery as INSERT, except that if
there's a primary key collision instead of returning an error an
update is performed.

New syntax:
[with_clause] UPSERT INTO [TABLE] table_name [(column list)]
{
  query_stmt
 | VALUES (value [, value...]) [, (value [, (value...)]) ...]
}

where column list must contain all of the key columns in
table_name, if specified, and table_name must be a Kudu table.

This patch also improves the behavior of INSERTing into Kudu
tables without specifying all of the key columns - this now
results in an analysis exception, rather than attempting the
INSERT and receiving an error back from Kudu.

Change-Id: I8df5cea36b642e267f85ff6b163f3dd96b8386e9
Reviewed-on: http://gerrit.cloudera.org:8080/4047
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/832fb537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/832fb537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/832fb537

Branch: refs/heads/master
Commit: 832fb537635fb695dcc85d4e27dd3b7646a7e6ab
Parents: e9a4077
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Thu Aug 18 09:57:13 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Nov 5 04:16:54 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-table-sink.cc                  |  15 +-
 common/thrift/DataSinks.thrift                  |   1 +
 fe/src/main/cup/sql-parser.cup                  |  44 +++-
 .../analysis/CreateTableAsSelectStmt.java       |   4 +-
 .../org/apache/impala/analysis/InsertStmt.java  | 223 ++++++++++++++-----
 .../apache/impala/planner/KuduTableSink.java    |  16 +-
 .../org/apache/impala/planner/TableSink.java    |   9 +
 fe/src/main/jflex/sql-scanner.flex              |   1 +
 .../impala/analysis/AnalyzeStmtsTest.java       |  12 +-
 .../impala/analysis/AnalyzeSubqueriesTest.java  |   8 +
 .../impala/analysis/AnalyzeUpsertStmtTest.java  | 132 +++++++++++
 .../apache/impala/analysis/AnalyzerTest.java    |   8 +
 .../org/apache/impala/analysis/ParserTest.java  |  79 ++++++-
 .../org/apache/impala/analysis/ToSqlTest.java   |  30 +++
 .../org/apache/impala/planner/PlannerTest.java  |   6 +
 .../queries/PlannerTest/kudu-upsert.test        |  92 ++++++++
 .../queries/QueryTest/kudu_crud.test            |  69 ++++++
 17 files changed, 665 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index a9beb29..fc26a53 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -167,6 +167,8 @@ kudu::client::KuduWriteOperation* 
KuduTableSink::NewWriteOp() {
     return table_->NewInsert();
   } else if (sink_action_ == TSinkAction::UPDATE) {
     return table_->NewUpdate();
+  } else if (sink_action_ == TSinkAction::UPSERT) {
+    return table_->NewUpsert();
   } else {
     DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: "
         << sink_action_;
@@ -190,17 +192,24 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
     unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
 
     for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
+      // For INSERT, output_expr_ctxs_ will contain all columns of the table 
in order.
+      // For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns 
that the op
+      // applies to, i.e. columns explicitly mentioned in the query, and
+      // referenced_columns is then used to map to actual column positions.
       int col = kudu_table_sink_.referenced_columns.empty() ?
           j : kudu_table_sink_.referenced_columns[j];
 
       void* value = output_expr_ctxs_[j]->GetValue(current_row);
 
-      // If the value is NULL and no explicit column references are provided, 
the column
-      // should be ignored, else it's explicitly set to NULL.
+      // If the value is NULL, we only need to explicitly set it for UPDATE 
and UPSERT.
+      // For INSERT, it can be ignored as unspecified cols will be implicitly 
set to NULL.
       if (value == NULL) {
-        if (!kudu_table_sink_.referenced_columns.empty()) {
+        if (sink_action_ == TSinkAction::UPDATE || sink_action_ == 
TSinkAction::UPSERT) {
+          DCHECK(!kudu_table_sink_.referenced_columns.empty());
           KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),
               "Could not add Kudu WriteOp.");
+        } else {
+          DCHECK(kudu_table_sink_.referenced_columns.empty());
         }
         continue;
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 83c63b7..26f74cf 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -33,6 +33,7 @@ enum TDataSinkType {
 enum TSinkAction {
   INSERT,
   UPDATE,
+  UPSERT,
   DELETE
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 836bc2d..c0865c4 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -262,8 +262,8 @@ terminal
   KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES,
   KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, 
KW_TINYINT,
   KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, 
KW_UPDATE,
-  KW_UPDATE_FN, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, 
KW_WHERE,
-  KW_WITH;
+  KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, 
KW_WHEN,
+  KW_WHERE, KW_WITH;
 
 terminal COLON, SEMICOLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, 
LBRACKET,
   RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
@@ -361,7 +361,7 @@ nonterminal ArrayList<String> opt_plan_hints;
 nonterminal TypeDef type_def;
 nonterminal Type type;
 nonterminal Expr sign_chain_expr;
-nonterminal InsertStmt insert_stmt;
+nonterminal InsertStmt insert_stmt, upsert_stmt;
 nonterminal UpdateStmt update_stmt;
 nonterminal DeleteStmt delete_stmt;
 nonterminal ArrayList<Pair<SlotRef, Expr>> update_set_expr_list;
@@ -521,6 +521,8 @@ stmt ::=
   {: RESULT = insert; :}
   | update_stmt:update
   {: RESULT = update; :}
+  | upsert_stmt:upsert
+  {: RESULT = upsert; :}
   | delete_stmt:delete
   {: RESULT = delete; :}
   | use_stmt:use
@@ -659,6 +661,11 @@ explain_stmt ::=
      update.setIsExplain();
      RESULT = update;
   :}
+  | KW_EXPLAIN upsert_stmt:upsert
+  {:
+     upsert.setIsExplain();
+     RESULT = upsert;
+  :}
   | KW_EXPLAIN delete_stmt:delete
   {:
      delete.setIsExplain();
@@ -673,19 +680,29 @@ insert_stmt ::=
   opt_with_clause:w KW_INSERT KW_OVERWRITE opt_ignore:ignore opt_kw_table 
table_name:table
   LPAREN opt_ident_list:col_perm RPAREN partition_clause:list 
opt_plan_hints:hints
   opt_query_stmt:query
-  {: RESULT = new InsertStmt(w, table, true, list, hints, query, col_perm, 
ignore); :}
+  {:
+    RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, 
col_perm,
+        ignore);
+  :}
   | opt_with_clause:w KW_INSERT KW_OVERWRITE
   opt_ignore:ignore opt_kw_table table_name:table
   partition_clause:list opt_plan_hints:hints query_stmt:query
-  {: RESULT = new InsertStmt(w, table, true, list, hints, query, null, 
ignore); :}
+  {:
+    RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, null, 
ignore);
+  :}
   | opt_with_clause:w KW_INSERT opt_ignore:ignore KW_INTO
   opt_kw_table table_name:table LPAREN opt_ident_list:col_perm RPAREN
   partition_clause:list opt_plan_hints:hints opt_query_stmt:query
-  {: RESULT = new InsertStmt(w, table, false, list, hints, query, col_perm, 
ignore); :}
+  {:
+    RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, 
col_perm,
+        ignore);
+  :}
   | opt_with_clause:w KW_INSERT
   opt_ignore:ignore KW_INTO opt_kw_table table_name:table partition_clause:list
   opt_plan_hints:hints query_stmt:query
-  {: RESULT = new InsertStmt(w, table, false, list, hints, query, null, 
ignore); :}
+  {:
+    RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, 
null, ignore);
+  :}
   ;
 
 // Update statements have an optional WHERE and optional FROM clause.
@@ -717,6 +734,17 @@ update_set_expr_list ::=
   :}
   ;
 
+// Upsert statements have an optional column permutation clause. If the column 
permutation
+// is present, the query statement clause is optional as well.
+upsert_stmt ::=
+  opt_with_clause:w KW_UPSERT KW_INTO opt_kw_table table_name:table
+    LPAREN opt_ident_list:col_perm RPAREN opt_plan_hints:hints 
opt_query_stmt:query
+  {: RESULT = InsertStmt.createUpsert(w, table, hints, query, col_perm); :}
+  | opt_with_clause:w KW_UPSERT KW_INTO opt_kw_table table_name:table
+    opt_plan_hints:hints query_stmt:query
+  {: RESULT = InsertStmt.createUpsert(w, table, hints, query, null); :}
+  ;
+
 // A DELETE statement comes in two main representations, the DELETE keyword 
with a path
 // specification as the target table with an optional FROM keyword or the 
DELETE
 // keyword followed by a table alias or reference and a full FROM clause. In 
all cases
@@ -3217,6 +3245,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_UPDATE_FN:r
   {: RESULT = r.toString(); :}
+  | KW_UPSERT:r
+  {: RESULT = r.toString(); :}
   | KW_USE:r
   {: RESULT = r.toString(); :}
   | KW_USING:r

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 2f5f166..2eb399f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -80,8 +80,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
         pkvs.add(new PartitionKeyValue(key, null));
       }
     }
-    insertStmt_ = new InsertStmt(null, createStmt.getTblName(), false, pkvs,
-        null, queryStmt, null, false);
+    insertStmt_ = InsertStmt.createInsert(
+        null, createStmt.getTblName(), false, pkvs, null, queryStmt, null, 
false);
   }
 
   public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 5e457a3..da360b1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
- * Representation of a single insert statement, including the select statement
+ * Representation of a single insert or upsert statement, including the select 
statement
  * whose results are to be inserted.
  */
 public class InsertStmt extends StatementBase {
@@ -70,20 +70,22 @@ public class InsertStmt extends StatementBase {
   // auto-generate one (for insert into tbl()) during analysis.
   private final boolean needsGeneratedQueryStatement_;
 
-  // The column permutation is specified by writing INSERT INTO tbl(col3, 
col1, col2...)
+  // The column permutation is specified by writing:
+  //     (INSERT|UPSERT) INTO tbl(col3, col1, col2...)
   //
   // It is a mapping from select-list expr index to (non-partition) output 
column. If
   // null, will be set to the default permutation of all non-partition columns 
in Hive
-  // order.
+  // order or all columns for Kudu tables.
   //
   // A column is said to be 'mentioned' if it occurs either in the column 
permutation, or
   // the PARTITION clause. If columnPermutation is null, all non-partition 
columns are
   // considered mentioned.
   //
-  // Between them, the columnPermutation and the set of partitionKeyValues 
must mention to
+  // Between them, the columnPermutation and the set of partitionKeyValues 
must mention
   // every partition column in the target table exactly once. Other columns, 
if not
-  // explicitly mentioned, will be assigned NULL values. Partition columns are 
not
-  // defaulted to NULL by design, and are not just for NULL-valued partition 
slots.
+  // explicitly mentioned, will be assigned NULL values for INSERTs and left 
unassigned
+  // for UPSERTs. Partition columns are not defaulted to NULL by design, and 
are not just
+  // for NULL-valued partition slots.
   //
   // Dynamic partition keys may occur in either the permutation or the 
PARTITION
   // clause. Partition columns with static values may only be mentioned in the 
PARTITION
@@ -123,11 +125,20 @@ public class InsertStmt extends StatementBase {
   private boolean hasClusteredHint_ = false;
 
   // Output expressions that produce the final results to write to the target 
table. May
-  // include casts, and NullLiterals where an output column isn't explicitly 
mentioned.
-  // Set in prepareExpressions(). The i'th expr produces the i'th column of 
the target
+  // include casts. Set in prepareExpressions().
+  // If this is an INSERT, will contain one Expr for all non-partition columns 
of the
+  // target table with NullLiterals where an output column isn't explicitly 
mentioned.
+  // The i'th expr produces the i'th column of the target table.
+  // If this is an UPSERT, will contain one Expr per column mentioned in the 
query and
+  // mentionedUpsertColumns_ is used to map between the Exprs and columns in 
the target
   // table.
   private ArrayList<Expr> resultExprs_ = Lists.newArrayList();
 
+  // Position mapping of exprs in resultExprs_ to columns in the target table -
+  // resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the 
target table.
+  // Only used for UPSERT, set in prepareExpressions().
+  private final List<Integer> mentionedUpsertColumns_ = Lists.newArrayList();
+
   // Set in analyze(). Exprs corresponding to key columns of Kudu tables. 
Empty for
   // non-Kudu tables.
   private ArrayList<Expr> primaryKeyExprs_ = Lists.newArrayList();
@@ -138,9 +149,29 @@ public class InsertStmt extends StatementBase {
   // For tables with primary keys, indicates if duplicate key errors are 
ignored.
   private final boolean ignoreDuplicates_;
 
-  public InsertStmt(WithClause withClause, TableName targetTable, boolean 
overwrite,
+  // True iff this is an UPSERT operation. Only supported for Kudu tables.
+  private final boolean isUpsert_;
+
+  public static InsertStmt createInsert(WithClause withClause, TableName 
targetTable,
+      boolean overwrite, List<PartitionKeyValue> partitionKeyValues,
+      List<String> planHints, QueryStmt queryStmt, List<String> 
columnPermutation,
+      boolean ignoreDuplicates) {
+    return new InsertStmt(withClause, targetTable, overwrite, 
partitionKeyValues,
+        planHints, queryStmt, columnPermutation, ignoreDuplicates, false);
+  }
+
+  public static InsertStmt createUpsert(WithClause withClause, TableName 
targetTable,
+      List<String> planHints, QueryStmt queryStmt, List<String> 
columnPermutation) {
+    return new InsertStmt(withClause, targetTable, false, null, planHints, 
queryStmt,
+        columnPermutation, false, true);
+  }
+
+  protected InsertStmt(WithClause withClause, TableName targetTable, boolean 
overwrite,
       List<PartitionKeyValue> partitionKeyValues, List<String> planHints,
-      QueryStmt queryStmt, List<String> columnPermutation, boolean 
ignoreDuplicates) {
+      QueryStmt queryStmt, List<String> columnPermutation, boolean 
ignoreDuplicates,
+      boolean isUpsert) {
+    Preconditions.checkState(!isUpsert || (!overwrite && !ignoreDuplicates &&
+        partitionKeyValues == null));
     withClause_ = withClause;
     targetTableName_ = targetTable;
     originalTableName_ = targetTableName_;
@@ -152,6 +183,7 @@ public class InsertStmt extends StatementBase {
     columnPermutation_ = columnPermutation;
     table_ = null;
     ignoreDuplicates_ = ignoreDuplicates;
+    isUpsert_ = isUpsert;
   }
 
   /**
@@ -170,6 +202,7 @@ public class InsertStmt extends StatementBase {
     columnPermutation_ = other.columnPermutation_;
     table_ = other.table_;
     ignoreDuplicates_ = other.ignoreDuplicates_;
+    isUpsert_ = other.isUpsert_;
   }
 
   @Override
@@ -184,6 +217,7 @@ public class InsertStmt extends StatementBase {
     hasNoShuffleHint_ = false;
     hasClusteredHint_ = false;
     resultExprs_.clear();
+    mentionedUpsertColumns_.clear();
     primaryKeyExprs_.clear();
   }
 
@@ -223,7 +257,7 @@ public class InsertStmt extends StatementBase {
 
     // Set target table and perform table-type specific analysis and auth 
checking.
     // Also checks if the target table is missing.
-    setTargetTable(analyzer);
+    analyzeTargetTable(analyzer);
 
     // Abort analysis if there are any missing tables beyond this point.
     if (!analyzer.getMissingTbls().isEmpty()) {
@@ -251,7 +285,8 @@ public class InsertStmt extends StatementBase {
     // Finally, prepareExpressions analyzes the expressions themselves, and 
confirms that
     // they are type-compatible with the target columns. Where columns are not 
mentioned
     // (and by this point, we know that missing columns are not partition 
columns),
-    // prepareExpressions assigns them a NULL literal expressions.
+    // prepareExpressions assigns them a NULL literal expressions, unless this 
is an
+    // UPSERT, in which case we don't want to overwrite unmentioned columns 
with NULL.
 
     // An null permutation clause is the same as listing all non-partition 
columns in
     // order.
@@ -332,13 +367,12 @@ public class InsertStmt extends StatementBase {
 
   /**
    * Sets table_ based on targetTableName_ and performs table-type specific 
analysis:
-   * - Partition clause is invalid for unpartitioned Hdfs tables and HBase 
tables
-   * - Overwrite is invalid for HBase tables
-   * - Check INSERT privileges as well as write access to Hdfs paths
-   * - Cannot insert into a view
+   * - Cannot (in|up)sert into a view
+   * - Cannot (in|up)sert into a table with unsupported column types
+   * - Analysis specific to insert and upsert operations
    * Adds table_ to the analyzer's descriptor table if analysis succeeds.
    */
-  private void setTargetTable(Analyzer analyzer) throws AnalysisException {
+  private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
     // If the table has not yet been set, load it from the Catalog. This 
allows for
     // callers to set a table to analyze that may not actually be created in 
the Catalog.
     // One example use case is CREATE TABLE AS SELECT which must run analysis 
on the
@@ -356,21 +390,42 @@ public class InsertStmt extends StatementBase {
           .allOf(Privilege.INSERT).toRequest());
     }
 
-    // We do not support inserting into views.
+    // We do not support (in|up)serting into views.
     if (table_ instanceof View) {
       throw new AnalysisException(
-          String.format("Impala does not support inserting into views: %s",
-          table_.getFullName()));
+          String.format("Impala does not support %sing into views: %s", 
getOpName(),
+              table_.getFullName()));
     }
 
+    // We do not support (in|up)serting into tables with unsupported column 
types.
     for (Column c: table_.getColumns()) {
       if (!c.getType().isSupported()) {
-        throw new AnalysisException(String.format("Unable to INSERT into 
target table " +
+        throw new AnalysisException(String.format("Unable to %s into target 
table " +
             "(%s) because the column '%s' has an unsupported type '%s'.",
-            targetTableName_, c.getName(), c.getType().toSql()));
+            getOpName(), targetTableName_, c.getName(), c.getType().toSql()));
       }
     }
 
+    // Perform operation-specific analysis.
+    if (isUpsert_) {
+      if (!(table_ instanceof KuduTable)) {
+        throw new AnalysisException("UPSERT is only supported for Kudu 
tables");
+      }
+    } else {
+      analyzeTableForInsert(analyzer);
+    }
+
+    // Add target table to descriptor table.
+    analyzer.getDescTbl().setTargetTable(table_);
+  }
+
+  /**
+   * Performs INSERT-specific table analysis:
+   * - Partition clause is invalid for unpartitioned or HBase tables
+   * - Check INSERT privileges as well as write access to Hdfs paths
+   * - Overwrite is invalid for HBase and Kudu tables
+   */
+  private void analyzeTableForInsert(Analyzer analyzer) throws 
AnalysisException {
     boolean isHBaseTable = (table_ instanceof HBaseTable);
     int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
 
@@ -433,48 +488,28 @@ public class InsertStmt extends StatementBase {
     if (isHBaseTable && overwrite_) {
       throw new AnalysisException("HBase doesn't have a way to perform INSERT 
OVERWRITE");
     }
-
-    // Add target table to descriptor table.
-    analyzer.getDescTbl().setTargetTable(table_);
   }
 
   /**
-   * Checks that the column permutation + select list + static partition exprs 
+
-   * dynamic partition exprs collectively cover exactly all columns in the 
target table
-   * (not more of fewer).
+   * Checks that the column permutation + select list + static partition exprs 
+ dynamic
+   * partition exprs collectively cover exactly all required columns in the 
target table,
+   * depending on the table type.
    */
   private void checkColumnCoverage(ArrayList<Column> selectExprTargetColumns,
       Set<String> mentionedColumnNames, int numSelectListExprs,
       int numStaticPartitionExprs) throws AnalysisException {
-    boolean isHBaseTable = (table_ instanceof HBaseTable);
-    int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
-    // Check that all columns are mentioned by the permutation and partition 
clauses
+    // Check that all required cols are mentioned by the permutation and 
partition clauses
     if (selectExprTargetColumns.size() + numStaticPartitionExprs !=
         table_.getColumns().size()) {
       // We've already ruled out too many columns in the permutation and 
partition clauses
       // by checking that there are no duplicates and that every column 
mentioned actually
-      // exists. So all columns aren't mentioned in the query. If the 
unmentioned columns
-      // include partition columns, this is an error.
-      List<String> missingColumnNames = Lists.newArrayList();
-      for (Column column: table_.getColumns()) {
-        if (!mentionedColumnNames.contains(column.getName())) {
-          // HBase tables have a single row-key column which is always in 
position 0. It
-          // must be mentioned, since it is invalid to set it to NULL (which 
would
-          // otherwise happen by default).
-          if (isHBaseTable && column.getPosition() == 0) {
-            throw new AnalysisException("Row-key column '" + column.getName() +
-                "' must be explicitly mentioned in column permutation.");
-          }
-          if (column.getPosition() < numClusteringCols) {
-            missingColumnNames.add(column.getName());
-          }
-        }
-      }
-
-      if (!missingColumnNames.isEmpty()) {
-        throw new AnalysisException(
-            "Not enough partition columns mentioned in query. Missing columns 
are: " +
-            Joiner.on(", ").join(missingColumnNames));
+      // exists. So all columns aren't mentioned in the query.
+      if (table_ instanceof KuduTable) {
+        checkRequiredKuduColumns(mentionedColumnNames);
+      } else if (table_ instanceof HBaseTable) {
+        checkRequiredHBaseColumns(mentionedColumnNames);
+      } else if (table_.getNumClusteringCols() > 0) {
+        checkRequiredPartitionedColumns(mentionedColumnNames);
       }
     }
 
@@ -507,6 +542,66 @@ public class InsertStmt extends StatementBase {
   }
 
   /**
+   * For a Kudu table, checks that all key columns are mentioned.
+   */
+  private void checkRequiredKuduColumns(Set<String> mentionedColumnNames)
+      throws AnalysisException {
+    Preconditions.checkState(table_ instanceof KuduTable);
+    List<String> keyColumns = ((KuduTable) table_).getPrimaryKeyColumnNames();
+    List<String> missingKeyColumnNames = Lists.newArrayList();
+    for (Column column : table_.getColumns()) {
+      if (!mentionedColumnNames.contains(column.getName())
+          && keyColumns.contains(column.getName())) {
+        missingKeyColumnNames.add(column.getName());
+      }
+    }
+
+    if (!missingKeyColumnNames.isEmpty()) {
+      throw new AnalysisException(String.format(
+          "All primary key columns must be specified for %sing into Kudu 
tables. " +
+          "Missing columns are: %s", getOpName(),
+          Joiner.on(", ").join(missingKeyColumnNames)));
+    }
+  }
+
+  /**
+   * For an HBase table, checks that the row-key column is mentioned.
+   * HBase tables have a single row-key column which is always in position 0. 
It
+   * must be mentioned, since it is invalid to set it to NULL (which would
+   * otherwise happen by default).
+   */
+  private void checkRequiredHBaseColumns(Set<String> mentionedColumnNames)
+      throws AnalysisException {
+    Preconditions.checkState(table_ instanceof HBaseTable);
+    Column column = table_.getColumns().get(0);
+    if (!mentionedColumnNames.contains(column.getName())) {
+      throw new AnalysisException("Row-key column '" + column.getName() +
+          "' must be explicitly mentioned in column permutation.");
+    }
+  }
+
+  /**
+   * For partitioned tables, checks that all partition columns are mentioned.
+   */
+  private void checkRequiredPartitionedColumns(Set<String> 
mentionedColumnNames)
+      throws AnalysisException {
+    int numClusteringCols = table_.getNumClusteringCols();
+    List<String> missingPartitionColumnNames = Lists.newArrayList();
+    for (Column column : table_.getColumns()) {
+      if (!mentionedColumnNames.contains(column.getName())
+          && column.getPosition() < numClusteringCols) {
+        missingPartitionColumnNames.add(column.getName());
+      }
+    }
+
+    if (!missingPartitionColumnNames.isEmpty()) {
+      throw new AnalysisException(
+          "Not enough partition columns mentioned in query. Missing columns 
are: " +
+          Joiner.on(", ").join(missingPartitionColumnNames));
+    }
+  }
+
+  /**
    * Performs four final parts of the analysis:
    * 1. Checks type compatibility between all expressions and their targets
    *
@@ -515,7 +610,7 @@ public class InsertStmt extends StatementBase {
    *
    * 3. Populates resultExprs_ with type-compatible expressions, in Hive 
column order,
    * for all expressions in the select-list. Unmentioned columns are assigned 
NULL literal
-   * expressions.
+   * expressions, unless this is an UPSERT.
    *
    * 4. Result exprs for key columns of Kudu tables are stored in 
primaryKeyExprs_.
    *
@@ -581,21 +676,24 @@ public class InsertStmt extends StatementBase {
     }
 
     // Finally, 'undo' the permutation so that the selectListExprs are in Hive 
column
-    // order, and add NULL expressions to all missing columns.
-    for (Column tblColumn: table_.getColumnsInHiveOrder()) {
+    // order, and add NULL expressions to all missing columns, unless this is 
an UPSERT.
+    ArrayList<Column> columns = table_.getColumnsInHiveOrder();
+    for (int col = 0; col < columns.size(); ++col) {
+      Column tblColumn = columns.get(col);
       boolean matchFound = false;
       for (int i = 0; i < selectListExprs.size(); ++i) {
         if 
(selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
           resultExprs_.add(selectListExprs.get(i));
+          if (isUpsert_) mentionedUpsertColumns_.add(col);
           matchFound = true;
           break;
         }
       }
       // If no match is found, either the column is a clustering column with a 
static
       // value, or it was unmentioned and therefore should have a NULL 
select-list
-      // expression.
+      // expression if this is an INSERT.
       if (!matchFound) {
-        if (tblColumn.getPosition() >= numClusteringCols) {
+        if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) {
           // Unmentioned non-clustering columns get NULL literals with the 
appropriate
           // target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
           resultExprs_.add(NullLiteral.create(tblColumn.getType()));
@@ -666,6 +764,8 @@ public class InsertStmt extends StatementBase {
     queryStmt_.rewriteExprs(rewriter);
   }
 
+  private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; }
+
   public List<String> getPlanHints() { return planHints_; }
   public TableName getTargetTableName() { return targetTableName_; }
   public Table getTargetTable() { return table_; }
@@ -687,8 +787,9 @@ public class InsertStmt extends StatementBase {
   public DataSink createDataSink() {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
-    return TableSink.create(table_, TableSink.Op.INSERT, partitionKeyExprs_,
-        ImmutableList.<Integer>of(), overwrite_, ignoreDuplicates_);
+    Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty());
+    return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : 
TableSink.Op.INSERT,
+        partitionKeyExprs_, mentionedUpsertColumns_, overwrite_, 
ignoreDuplicates_);
   }
 
   /**
@@ -708,7 +809,7 @@ public class InsertStmt extends StatementBase {
 
     if (withClause_ != null) strBuilder.append(withClause_.toSql() + " ");
 
-    strBuilder.append("INSERT ");
+    strBuilder.append(getOpName() + " ");
     if (overwrite_) {
       strBuilder.append("OVERWRITE ");
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index 3d98aca..0a99af4 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -58,14 +58,16 @@ public class KuduTableSink extends TableSink {
     StringBuilder output = new StringBuilder();
     output.append(prefix + sinkOp_.toExplainString());
     output.append(" KUDU [" + targetTable_.getFullName() + "]\n");
-    output.append(detailPrefix);
-    if (sinkOp_ == Op.INSERT) {
-      output.append("check unique keys: ");
-    } else {
-      output.append("check keys exist: ");
+    if (sinkOp_ != Op.UPSERT) {
+      output.append(detailPrefix);
+      if (sinkOp_ == Op.INSERT) {
+        output.append("check unique keys: ");
+      } else {
+        output.append("check keys exist: ");
+      }
+      output.append(ignoreNotFoundOrDuplicate_);
+      output.append("\n");
     }
-    output.append(ignoreNotFoundOrDuplicate_);
-    output.append("\n");
     if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       output.append(PrintUtils.printHosts(detailPrefix, 
fragment_.getNumNodes()));
       output.append(PrintUtils.printMemCost(" ", perHostMemCost_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java 
b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 788bb50..102b280 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -51,6 +51,13 @@ public abstract class TableSink extends DataSink {
       @Override
       public TSinkAction toThrift() { return TSinkAction.UPDATE; }
     },
+    UPSERT {
+      @Override
+      public String toExplainString() { return "UPSERT INTO"; }
+
+      @Override
+      public TSinkAction toThrift() { return TSinkAction.UPSERT; }
+    },
     DELETE {
       @Override
       public String toExplainString() { return "DELETE FROM"; }
@@ -105,6 +112,8 @@ public abstract class TableSink extends DataSink {
       Preconditions.checkState(overwrite == false);
       // Partition clauses don't make sense for Kudu inserts.
       Preconditions.checkState(partitionKeyExprs.isEmpty());
+      // UPSERT is incompatible with ignoreDuplicates.
+      Preconditions.checkState(sinkAction != Op.UPSERT || !ignoreDuplicates);
       return new KuduTableSink(table, sinkAction, referencedColumns, 
ignoreDuplicates);
     } else {
       throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex 
b/fe/src/main/jflex/sql-scanner.flex
index def0be2..099ceda 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -228,6 +228,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
     keywordMap.put("union", new Integer(SqlParserSymbols.KW_UNION));
     keywordMap.put("update", new Integer(SqlParserSymbols.KW_UPDATE));
     keywordMap.put("update_fn", new Integer(SqlParserSymbols.KW_UPDATE_FN));
+    keywordMap.put("upsert", new Integer(SqlParserSymbols.KW_UPSERT));
     keywordMap.put("use", new Integer(SqlParserSymbols.KW_USE));
     keywordMap.put("using", new Integer(SqlParserSymbols.KW_USING));
     keywordMap.put("values", new Integer(SqlParserSymbols.KW_VALUES));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index cbd2d07..a6760de 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -30,6 +30,8 @@ import org.apache.impala.common.AnalysisException;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.testutil.TestUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -2462,6 +2464,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "(1, true, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a', cast(0 as timestamp), 2009, 
10)," +
         "(2, false, 2, 2, NULL, 2, 2.0, 2.0, 'b', 'b', cast(0 as timestamp), 
2009, 2)," +
         "(3, true, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c', cast(0 as timestamp), 2009, 
3))");
+
     // Test multiple aliases. Values() is like union, the column labels are 
'x' and 'y'.
     AnalyzesOk("values((1 as x, 'a' as y), (2 as k, 'b' as j))");
     // Test order by, offset and limit.
@@ -2820,7 +2823,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     // Cannot insert into a view.
     AnalysisError("insert into functional.alltypes_view partition(year, month) 
" +
         "select * from functional.alltypes",
-        "Impala does not support inserting into views: 
functional.alltypes_view");
+        "Impala does not support INSERTing into views: 
functional.alltypes_view");
     // Cannot load into a view.
     AnalysisError("load data inpath 
'/test-warehouse/tpch.lineitem/lineitem.tbl' " +
         "into table functional.alltypes_view",
@@ -2978,6 +2981,13 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "'b.int_array_col' correlated with an outer block as well as an " +
         "uncorrelated one 'functional.alltypestiny':\n" +
         "SELECT item FROM b.int_array_col, functional.alltypestiny");
+
+    if (RuntimeEnv.INSTANCE.isKuduSupported()) {
+      // Key columns missing from permutation
+      AnalysisError("insert into functional_kudu.testtbl(zip) values(1)",
+          "All primary key columns must be specified for INSERTing into Kudu 
tables. " +
+          "Missing columns are: id");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index fb2d63a..329be5c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -1163,6 +1163,14 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
         "select * from functional.alltypestiny where id = (select 1) " +
         "union select * from functional.alltypestiny where id = (select 2)");
 
+    // UPSERT
+    AnalyzesOk("upsert into functional_kudu.testtbl select * from " +
+        "functional_kudu.testtbl where id in (select id from 
functional_kudu.testtbl " +
+        "where zip = 0)");
+    AnalyzesOk("upsert into functional_kudu.testtbl select * from " +
+        "functional_kudu.testtbl union select bigint_col, string_col, int_col 
from " +
+        "functional.alltypes");
+
     // CTAS with correlated subqueries
     AnalyzesOk("create table functional.test_tbl as select * from " +
         "functional.alltypes t where t.id in (select id from 
functional.alltypesagg " +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
new file mode 100644
index 0000000..118b322
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.junit.Test;
+
+import org.apache.impala.testutil.TestUtils;
+
+public class AnalyzeUpsertStmtTest extends AnalyzerTest {
+  @Test
+  public void TestUpsert() {
+    TestUtils.assumeKuduIsSupported();
+    // VALUES clause
+    AnalyzesOk("upsert into table functional_kudu.testtbl values(1, 'a', 1)");
+    AnalyzesOk("upsert into table functional_kudu.testtbl(id) values(1)");
+    AnalyzesOk("upsert into table functional_kudu.testtbl values(1, 'a', 1), " 
+
+        "(2, 'b', 2), (3, 'c', 3)");
+    // SELECT clause
+    AnalyzesOk("upsert into functional_kudu.testtbl select bigint_col, 
string_col, " +
+        "int_col from functional.alltypes");
+    // Permutation lists
+    AnalyzesOk("upsert into table functional_kudu.testtbl(id) select 
bigint_col " +
+        "from functional.alltypes");
+    AnalyzesOk("upsert into table functional_kudu.testtbl(id, name) select 
bigint_col, " +
+        "string_col from functional.alltypes");
+    AnalyzesOk("upsert into table functional_kudu.testtbl(name, zip, id) 
select " +
+        "string_col, int_col, bigint_col from functional.alltypes");
+    // WITH clause
+    AnalyzesOk("with t1 as (select 1, 'a', 2) upsert into 
functional_kudu.testtbl " +
+        "select * from t1");
+    AnalyzesOk("with t1 as (select * from functional.alltypes) upsert into " +
+        "functional_kudu.testtbl select bigint_col, string_col, int_col from 
t1");
+    // WITH belonging to the select clause
+    AnalyzesOk("upsert into functional_kudu.testtbl with t1 as (select * from 
" +
+        "functional.alltypes) select bigint_col, string_col, int_col from t1");
+    AnalyzesOk("upsert into functional_kudu.testtbl(id) with t1 as (select * 
from " +
+        "functional.alltypes) select bigint_col from t1");
+    // Multiple WITH clauses
+    AnalyzesOk("with t1 as (select * from functional.alltypestiny) " +
+        "upsert into functional_kudu.testtbl with t2 as (select * from " +
+        "functional.alltypessmall) select bigint_col, string_col, int_col from 
t1");
+    // Correlated inline view
+    AnalyzesOk("upsert into table functional_kudu.testtbl " +
+        "select a.id, string_col, b.month " +
+        "from functional.alltypes a, functional.allcomplextypes b, " +
+        "(select item from b.int_array_col) v1 " +
+        "where a.id = b.id");
+    // Hint
+    AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select * 
from " +
+        "functional_kudu.testtbl");
+    // Incorrect hint, results in warning
+    AnalyzesOk("upsert into table functional_kudu.testtbl [badhint] select * 
from " +
+        "functional_kudu.testtbl", "INSERT hint not recognized: badhint");
+
+    // Key columns missing from permutation
+    AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)",
+        "All primary key columns must be specified for UPSERTing into Kudu 
tables. " +
+        "Missing columns are: id");
+    // SELECT clause with wrong number of columns
+    AnalysisError("upsert into functional_kudu.testtbl select * from 
functional.alltypes",
+        "Target table 'functional_kudu.testtbl' has fewer columns (3) than the 
SELECT " +
+        "/ VALUES clause returns (13)");
+    // VALUES clause with wrong number of columns
+    AnalysisError("upsert into functional_kudu.testtbl values(1)", "Target 
table " +
+        "'functional_kudu.testtbl' has more columns (3) than the SELECT / 
VALUES " +
+        "clause returns (1)");
+    // Permutation with wrong number of columns
+    AnalysisError("upsert into functional_kudu.testtbl(id, name, zip) 
values(1)",
+        "Column permutation mentions more columns (3) than the SELECT / VALUES 
" +
+        "clause returns (1)");
+    // Type mismatch
+    AnalysisError("upsert into functional_kudu.testtbl values(1, 1, 1)",
+        "Target table 'functional_kudu.testtbl' is incompatible with source " +
+        "expressions.\nExpression '1' (type: TINYINT) is not compatible with 
column " +
+        "'name' (type: STRING)");
+    // Permutation with type mismatch
+    AnalysisError("upsert into functional_kudu.testtbl(zip, id, name) " +
+        "values('a', 'a', 'a')", "Target table 'functional_kudu.testtbl' is " +
+        "incompatible with source expressions.\nExpression ''a'' (type: 
STRING) is not " +
+        "compatible with column 'zip' (type: INT)");
+    // Permutation with invalid column name
+    AnalysisError("upsert into functional_kudu.testtbl (id, name, invalid) 
values " +
+        "(1, 'a', 1)", "Unknown column 'invalid' in column permutation");
+    // Permutation with repeated column
+    AnalysisError("upsert into functional_kudu.testtbl (id, name, zip, id) 
values " +
+        "(1, 'a', 1, 1)", "Duplicate column 'id' in column permutation");
+    // UPSERT into non-Kudu table
+    AnalysisError("upsert into functional.alltypes select * from 
functional.alltypes",
+        "UPSERT is only supported for Kudu tables");
+    // Unknown target DB
+    AnalysisError("upsert into UNKNOWNDB.testtbl select * " +
+        "from functional.alltypesnopart", "Database does not exist: 
UNKNOWNDB");
+    // WITH-clause tables cannot be upserted into
+    AnalysisError("with t1 as (select 'a' x) upsert into t1 values('b' x)",
+        "Table does not exist: default.t1");
+    // Cannot upsert into a view
+    AnalysisError("upsert into functional.alltypes_view select * from " +
+        "functional.alltypes",
+        "Impala does not support UPSERTing into views: 
functional.alltypes_view");
+    // Upsert with uncorrelated inline view
+    AnalysisError("upsert into table functional_kudu.testtbl " +
+        "select a.id, a.string_col, b.month " +
+        "from functional.alltypes a, functional.allcomplextypes b, " +
+        "(select item from b.int_array_col, functional.alltypestiny) v1 " +
+        "where a.id = b.id",
+        "Nested query is illegal because it contains a table reference " +
+        "'b.int_array_col' correlated with an outer block as well as an " +
+        "uncorrelated one 'functional.alltypestiny':\n" +
+        "SELECT item FROM b.int_array_col, functional.alltypestiny");
+    // Illegal complex-typed expr
+    AnalysisError("upsert into functional_kudu.testtbl " +
+        "select int_struct_col from functional.allcomplextypes",
+        "Expr 'int_struct_col' in select list returns a " +
+        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
+        "Only scalar types are allowed in the select list.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 993f489..b9d1595 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -428,6 +428,10 @@ public class AnalyzerTest extends FrontendTestBase {
         "select id from (select id+2 from functional_hbase.alltypessmall) a",
         "Could not resolve column/field reference: 'id'");
 
+    // Analysis error from explain upsert
+    AnalysisError("explain upsert into table functional.alltypes select * from 
" +
+        "functional.alltypes", "UPSERT is only supported for Kudu tables");
+
     // Positive test for explain query
     AnalyzesOk("explain select * from functional.AllTypes");
 
@@ -437,6 +441,10 @@ public class AnalyzerTest extends FrontendTestBase {
         "select id, bool_col, tinyint_col, smallint_col, int_col, int_col, " +
         "float_col, float_col, date_string_col, string_col, timestamp_col " +
         "from functional.alltypes");
+
+    // Positive test for explain upsert
+    AnalyzesOk("explain upsert into table functional_kudu.testtbl select * 
from " +
+        "functional_kudu.testtbl");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index c163d70..092e146 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -393,6 +393,12 @@ public class ParserTest {
           "insert overwrite t(a, b) partition(x, y) %sfoo,bar,baz%s select * 
from t",
           prefix, suffix), "foo", "bar", "baz");
 
+      // Test upsert hints.
+      ParsesOk(String.format("upsert into t %sshuffle%s select * from t", 
prefix,
+          suffix));
+      ParsesOk(String.format("upsert into t (x, y) %sshuffle%s select * from 
t", prefix,
+          suffix));
+
       // Test TableRef hints.
       TestTableHints(String.format(
           "select * from functional.alltypes %sschedule_disk_local%s", prefix, 
suffix),
@@ -761,6 +767,11 @@ public class ParserTest {
     ParsesOk("insert overwrite table t select a from test union select a from 
test " +
         "union select a from test union select a from test");
 
+    // Union in upsert query.
+    ParsesOk("upsert into table t select a from test union select a from 
test");
+    ParsesOk("upsert into table t select a from test union select a from test 
" +
+        "union select a from test union select a from test");
+
     // No complete select statement on lhs.
     ParserError("a from test union select a from test");
     // No complete select statement on rhs.
@@ -777,12 +788,14 @@ public class ParserTest {
     ParsesOk("select * from (values(1, 'a', abc, 1.0, *)) as t");
     ParsesOk("values(1, 'a', abc, 1.0, *) union all values(1, 'a', abc, 1.0, 
*)");
     ParsesOk("insert into t values(1, 'a', abc, 1.0, *)");
+    ParsesOk("upsert into t values(1, 'a', abc, 1.0, *)");
     // Values stmt with multiple rows.
     ParsesOk("values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
     ParsesOk("select * from (values(1, abc), ('x', cde), (2), (efg, fgh, ghi)) 
as t");
     ParsesOk("values(1, abc), ('x', cde), (2), (efg, fgh, ghi) " +
         "union all values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
     ParsesOk("insert into t values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
+    ParsesOk("upsert into t values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
     // Test additional parenthesis.
     ParsesOk("(values(1, abc), ('x', cde), (2), (efg, fgh, ghi))");
     ParsesOk("values((1, abc), ('x', cde), (2), (efg, fgh, ghi))");
@@ -838,6 +851,14 @@ public class ParserTest {
     // With clause before insert statement.
     ParsesOk("with t as (select 1) insert into x select * from t");
     ParsesOk("with t(x) as (select 1) insert into x select * from t");
+    // With clause in query statement of upsert statement.
+    ParsesOk("upsert into x with t as (select * from tab) select * from t");
+    ParsesOk("upsert into x with t(x, y) as (select * from tab) select * from 
t");
+    ParsesOk("upsert into x with t as (values(1, 2, 3)) select * from t");
+    ParsesOk("upsert into x with t(x, y) as (values(1, 2, 3)) select * from 
t");
+    // With clause before upsert statement.
+    ParsesOk("with t as (select 1) upsert into x select * from t");
+    ParsesOk("with t(x) as (select 1) upsert into x select * from t");
 
     // Test quoted identifier or string literal as table alias.
     ParsesOk("with `t1` as (select 1 a), 't2' as (select 2 a), \"t3\" as 
(select 3 a)" +
@@ -854,6 +875,10 @@ public class ParserTest {
     ParsesOk("with t as (select 1) insert into x with t as (select 2) select * 
from t");
     ParsesOk("with t(c1) as (select 1) " +
         "insert into x with t(c2) as (select 2) select * from t");
+    // Multiple with clauses. One before the upsert and one inside the query 
statement.
+    ParsesOk("with t as (select 1) upsert into x with t as (select 2) select * 
from t");
+    ParsesOk("with t(c1) as (select 1) " +
+        "upsert into x with t(c2) as (select 2) select * from t");
 
     // Empty with clause.
     ParserError("with t as () select 1");
@@ -873,6 +898,9 @@ public class ParserTest {
     // Insert in with clause is not valid.
     ParserError("with t as (insert into x select * from tab) select * from t");
     ParserError("with t(c1) as (insert into x select * from tab) select * from 
t");
+    // Upsert in with clause is not valid.
+    ParserError("with t as (upsert into x select * from tab) select * from t");
+    ParserError("with t(c1) as (upsert into x select * from tab) select * from 
t");
     // Union operands need to be parenthesized to have their own with clause.
     ParserError("select * from t union all with t as (select 2) select * from 
t");
   }
@@ -966,7 +994,7 @@ public class ParserTest {
     ParsesOk("select a from `abc\u007fabc`");
 
     // Quoted identifiers can contain keywords.
-    ParsesOk("select `select`, `insert` from `table` where `where` = 10");
+    ParsesOk("select `select`, `insert`, `upsert` from `table` where `where` = 
10");
 
     // Quoted identifiers cannot contain "`"
     ParserError("select a from `abcde`abcde`");
@@ -1610,6 +1638,47 @@ public class ParserTest {
   }
 
   @Test
+  public void TestUpsert() {
+    for (String optTbl: new String[] {"", "table"}) {
+      // SELECT clause
+      ParsesOk(String.format("upsert into %s t select a from src", optTbl));
+      // VALUES clause
+      ParsesOk(String.format("upsert into %s t values (1, 2, 3)", optTbl));
+      // Permutation
+      ParsesOk(String.format("upsert into %s t (a,b,c) values(1,2,3)", 
optTbl));
+      // Permutation with mismatched select list (should parse fine)
+      ParsesOk(String.format("upsert into %s t (a,b,c) values(1,2,3,4,5,6)", 
optTbl));
+      // Empty permutation list
+      ParsesOk(String.format("upsert into %s t () select 1 from a", optTbl));
+      // Permutation with optional query statement
+      ParsesOk(String.format("upsert into %s t () ", optTbl));
+      // WITH clause
+      ParsesOk(String.format("with x as (select a from src where b > 5) upsert 
into %s " +
+          "t select * from x", optTbl));
+
+      // Missing query statement
+      ParserError(String.format("upsert into %s t", optTbl));
+      // Missing 'into'.
+      ParserError(String.format("upsert %s t select a from src where b > 5", 
optTbl));
+      // Missing target table identifier.
+      ParserError(String.format("upsert into %s select a from src where b > 
5", optTbl));
+      // No comma in permutation list
+      ParserError(String.format("upsert %s t(a b c) select 1 from a", optTbl));
+      // Can't use strings as identifiers in permutation list
+      ParserError(String.format("upsert %s t('a') select 1 from a", optTbl));
+      // Expressions not allowed in permutation list
+      ParserError(String.format("upsert %s t(a=1, b) select 1 from a", 
optTbl));
+      // Upsert doesn't support ignore.
+      ParserError(String.format("upsert ignore into %s t select a from src", 
optTbl));
+      // Upsert doesn't support partition clauses.
+      ParserError(String.format(
+          "upsert into %s t partition (pk1=10) select a from src", optTbl));
+      // Upsert doesn't support overwrite.
+      ParserError(String.format("upsert overwrite %s t select 1 from src", 
optTbl));
+    }
+  }
+
+  @Test
   public void TestUpdate() {
     ParsesOk("update t set x = 3 where a < b");
     ParsesOk("update t set x = (3 + length(\"hallo\")) where a < 'adasas'");
@@ -2512,6 +2581,7 @@ public class ParserTest {
     ParserError("CREATE VIEW Foo.Bar (x) AS");
     // Invalid view definitions. A view definition must be a query statement.
     ParserError("CREATE VIEW Foo.Bar (x) AS INSERT INTO t select * from t");
+    ParserError("CREATE VIEW Foo.Bar (x) AS UPSERT INTO t select * from t");
     ParserError("CREATE VIEW Foo.Bar (x) AS CREATE TABLE Wrong (i int)");
     ParserError("CREATE VIEW Foo.Bar (x) AS ALTER TABLE Foo COLUMNS (i int, s 
string)");
     ParserError("CREATE VIEW Foo.Bar (x) AS CREATE VIEW Foo.Bar AS SELECT 1");
@@ -2541,6 +2611,7 @@ public class ParserTest {
     ParserError("ALTER VIEW Foo.Bar AS");
     // Invalid view definitions. A view definition must be a query statement.
     ParserError("ALTER VIEW Foo.Bar AS INSERT INTO t select * from t");
+    ParserError("ALTER VIEW Foo.Bar AS UPSERT INTO t select * from t");
     ParserError("ALTER VIEW Foo.Bar AS CREATE TABLE Wrong (i int)");
     ParserError("ALTER VIEW Foo.Bar AS ALTER TABLE Foo COLUMNS (i int, s 
string)");
     ParserError("ALTER VIEW Foo.Bar AS CREATE VIEW Foo.Bar AS SELECT 1, 2, 3");
@@ -2571,8 +2642,9 @@ public class ParserTest {
     ParserError("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUET AS 
WITH");
     ParserError("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUET AS");
 
-    // INSERT statements are not allowed
+    // INSERT/UPSERT statements are not allowed
     ParserError("CREATE TABLE Foo AS INSERT INTO Foo SELECT 1");
+    ParserError("CREATE TABLE Foo AS UPSERT INTO Foo SELECT 1");
 
     // Column and partition definitions not allowed
     ParserError("CREATE TABLE Foo(i int) AS SELECT 1");
@@ -2840,7 +2912,7 @@ public class ParserTest {
         "Encountered: IDENTIFIER\n" +
         "Expected: ALTER, COMPUTE, CREATE, DELETE, DESCRIBE, DROP, EXPLAIN, 
GRANT, " +
         "INSERT, INVALIDATE, LOAD, REFRESH, REVOKE, SELECT, SET, SHOW, 
TRUNCATE, " +
-        "UPDATE, USE, VALUES, WITH\n");
+        "UPDATE, UPSERT, USE, VALUES, WITH\n");
 
     // missing select list
     ParserError("select from t",
@@ -2981,6 +3053,7 @@ public class ParserTest {
   public void TestExplain() {
     ParsesOk("explain select a from tbl");
     ParsesOk("explain insert into tbl select a, b, c, d from tbl");
+    ParsesOk("explain upsert into tbl select a, b, c, d from tbl");
     ParserError("explain");
     // cannot EXPLAIN an explain stmt
     ParserError("explain explain select a from tbl");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 371e811..b5cf446 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -655,6 +655,8 @@ public class ToSqlTest extends FrontendTestBase {
         "values(1, true, 1, 1, 10, 10, 10.0, 10.0, 'a', 'a', cast (0 as 
timestamp))",
         "INSERT INTO TABLE functional.alltypessmall PARTITION (year=2009, 
month=4) " +
         "VALUES(1, TRUE, 1, 1, 10, 10, 10.0, 10.0, 'a', 'a', CAST(0 AS 
TIMESTAMP))");
+    testToSql("upsert into table functional_kudu.testtbl values(1, 'a', 1)",
+        "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)");
   }
 
   /**
@@ -903,6 +905,11 @@ public class ToSqlTest extends FrontendTestBase {
         "WITH t1 AS (SELECT * FROM functional.alltypes) " +
         "INSERT INTO TABLE functional.alltypes PARTITION (year, month) " +
             "SELECT * FROM t1");
+    // WITH clause in upsert stmt.
+    testToSql("with t1 as (select * from functional.alltypes) upsert into " +
+        "functional_kudu.testtbl select bigint_col, string_col, int_col from 
t1",
+        "WITH t1 AS (SELECT * FROM functional.alltypes) UPSERT INTO TABLE " +
+        "functional_kudu.testtbl SELECT bigint_col, string_col, int_col FROM 
t1");
     // Test joins in WITH-clause view.
     testToSql("with t as (select a.* from functional.alltypes a, " +
             "functional.alltypes b where a.id = b.id) select * from t",
@@ -1016,6 +1023,29 @@ public class ToSqlTest extends FrontendTestBase {
   }
 
   @Test
+  public void upsertTest() {
+    // VALUES clause
+    testToSql("upsert into functional_kudu.testtbl values (1, 'a', 1)",
+        "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)");
+
+    // SELECT clause
+    testToSql("upsert into functional_kudu.testtbl select bigint_col, 
string_col, " +
+        "int_col from functional.alltypes", "UPSERT INTO TABLE 
functional_kudu.testtbl " +
+        "SELECT bigint_col, string_col, int_col FROM functional.alltypes");
+
+    // WITH clause
+    testToSql("with x as (select bigint_col, string_col, int_col from " +
+        "functional.alltypes) upsert into table functional_kudu.testtbl select 
* from x",
+        "WITH x AS (SELECT bigint_col, string_col, int_col FROM 
functional.alltypes) " +
+        "UPSERT INTO TABLE functional_kudu.testtbl SELECT * FROM x");
+
+    // Permutation
+    testToSql("upsert into table functional_kudu.testtbl (zip, id, name) 
values " +
+        "(1, 1, 'a')", "UPSERT INTO TABLE functional_kudu.testtbl(zip, id, 
name) " +
+        "VALUES(1, 1, 'a')");
+  }
+
+  @Test
   public void testAnalyticExprs() {
     testToSql(
         "select sum(int_col) over (partition by id order by tinyint_col "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 6250969..91035d1 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -254,6 +254,12 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testKuduUpsert() {
+    Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
+    runPlannerTestFile("kudu-upsert");
+  }
+
+  @Test
   public void testKuduUpdate() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
     runPlannerTestFile("kudu-update");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
new file mode 100644
index 0000000..b106b02
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -0,0 +1,92 @@
+# simple upsert with select
+upsert into table functional_kudu.testtbl
+select bigint_col, string_col, int_col from functional.alltypes
+where year=2009 and month=05
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=1/24 files=1 size=20.36KB
+====
+# simple upsert with values clause
+upsert into table functional_kudu.testtbl
+values (1, 'a', 1), (2, 'b', 2)
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+00:UNION
+   constant-operands=2
+====
+# upsert with 'with' clause and limit
+with x as (select string_col, count(*) from functional.alltypes group by 
string_col)
+upsert into table functional_kudu.testtbl
+select a.bigint_col, a.string_col, a.int_col from functional.alltypes a, x
+where x.string_col = a.string_col
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: a.string_col = string_col
+|  runtime filters: RF000 <- string_col
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  group by: string_col
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.string_col
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.string_col = string_col
+|  runtime filters: RF000 <- string_col
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  05:AGGREGATE [FINALIZE]
+|  |  group by: string_col
+|  |
+|  04:EXCHANGE [HASH(string_col)]
+|  |
+|  02:AGGREGATE [STREAMING]
+|  |  group by: string_col
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.string_col
+====
+# upsert with inline view
+upsert into functional_kudu.testtbl
+select v.id, v.string_col, v.cnt from (
+  select id, string_col, cast(count(*) as int) cnt from
+  functional.alltypes
+  group by 1, 2) v
+where cnt < 10
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: id, string_col
+|  having: CAST(count(*) AS INT) < 10
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+upsert into functional_kudu.testtbl /*+ clustered */
+select * from functional_kudu.testtbl
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+01:SORT
+|  order by: id DESC NULLS LAST
+|
+00:SCAN KUDU [functional_kudu.testtbl]
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index 2782908..949a25c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -261,6 +261,75 @@ delete ignore a from tdata a, tdata b where a.id = 666
 row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
+select * from tdata
+---- RESULTS
+40,'he',0,43,'e',false
+120,'she',0,99,'f',true
+320,'',2,932,'',false
+1,'unknown',1,43,'aaaaaaaaaaaaaaaaaaaa',false
+2,'david',1,43,'b',false
+3,'todd',1,43,'c',true
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata values (40, 'they', 1, 43, cast('e' as VARCHAR(20)), 
false),
+(1, NULL, 1, 0, cast('a' as VARCHAR(20)), true)
+---- RESULTS
+====
+---- QUERY
+select * from tdata
+---- RESULTS
+40,'they',1,43,'e',false
+120,'she',0,99,'f',true
+320,'',2,932,'',false
+1,'NULL',1,0,'a',true
+2,'david',1,43,'b',false
+3,'todd',1,43,'c',true
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata (id, valf) values (2, NULL), (120, 20), (0, 0)
+---- RESULTS
+====
+---- QUERY
+select * from tdata
+---- RESULTS
+40,'they',1,43,'e',false
+120,'she',20,99,'f',true
+320,'',2,932,'',false
+1,'NULL',1,0,'a',true
+2,'david',NULL,43,'b',false
+3,'todd',1,43,'c',true
+0,'NULL',0,NULL,'NULL',NULL
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata (valb, name, id)
+select false as valb, 'he' as name, id from tdata where id < 2
+---- RESULTS
+====
+---- QUERY
+select * from tdata
+---- RESULTS
+40,'they',1,43,'e',false
+120,'she',20,99,'f',true
+320,'',2,932,'',false
+1,'he',1,0,'a',false
+2,'david',NULL,43,'b',false
+3,'todd',1,43,'c',true
+0,'he',0,NULL,'NULL',false
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata (id, name) values (null, '')
+---- CATCH
+Could not add Kudu WriteOp.: Invalid argument: column not nullable: id[int32 
NOT NULL]
+====
+---- QUERY
 # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column 
order correct
 # if the Kudu columns are of different types.
 create table impala_3454 (key_1 tinyint, key_2 bigint, PRIMARY KEY (key_1, 
key_2))

Reply via email to