IMPALA-3725 Support Kudu UPSERT in Impala
This patch introduces a new query statement, UPSERT, for Kudu
tables which operates like an INSERT and uses all of the analysis,
planning, and execution machinery as INSERT, except that if
there's a primary key collision instead of returning an error an
update is performed.
New syntax:
[with_clause] UPSERT INTO [TABLE] table_name [(column list)]
{
query_stmt
| VALUES (value [, value...]) [, (value [, (value...)]) ...]
}
where column list must contain all of the key columns in
table_name, if specified, and table_name must be a Kudu table.
This patch also improves the behavior of INSERTing into Kudu
tables without specifying all of the key columns - this now
results in an analysis exception, rather than attempting the
INSERT and receiving an error back from Kudu.
Change-Id: I8df5cea36b642e267f85ff6b163f3dd96b8386e9
Reviewed-on: http://gerrit.cloudera.org:8080/4047
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/832fb537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/832fb537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/832fb537
Branch: refs/heads/master
Commit: 832fb537635fb695dcc85d4e27dd3b7646a7e6ab
Parents: e9a4077
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Thu Aug 18 09:57:13 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Nov 5 04:16:54 2016 +0000
----------------------------------------------------------------------
be/src/exec/kudu-table-sink.cc | 15 +-
common/thrift/DataSinks.thrift | 1 +
fe/src/main/cup/sql-parser.cup | 44 +++-
.../analysis/CreateTableAsSelectStmt.java | 4 +-
.../org/apache/impala/analysis/InsertStmt.java | 223 ++++++++++++++-----
.../apache/impala/planner/KuduTableSink.java | 16 +-
.../org/apache/impala/planner/TableSink.java | 9 +
fe/src/main/jflex/sql-scanner.flex | 1 +
.../impala/analysis/AnalyzeStmtsTest.java | 12 +-
.../impala/analysis/AnalyzeSubqueriesTest.java | 8 +
.../impala/analysis/AnalyzeUpsertStmtTest.java | 132 +++++++++++
.../apache/impala/analysis/AnalyzerTest.java | 8 +
.../org/apache/impala/analysis/ParserTest.java | 79 ++++++-
.../org/apache/impala/analysis/ToSqlTest.java | 30 +++
.../org/apache/impala/planner/PlannerTest.java | 6 +
.../queries/PlannerTest/kudu-upsert.test | 92 ++++++++
.../queries/QueryTest/kudu_crud.test | 69 ++++++
17 files changed, 665 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index a9beb29..fc26a53 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -167,6 +167,8 @@ kudu::client::KuduWriteOperation*
KuduTableSink::NewWriteOp() {
return table_->NewInsert();
} else if (sink_action_ == TSinkAction::UPDATE) {
return table_->NewUpdate();
+ } else if (sink_action_ == TSinkAction::UPSERT) {
+ return table_->NewUpsert();
} else {
DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: "
<< sink_action_;
@@ -190,17 +192,24 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch*
batch) {
unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
+ // For INSERT, output_expr_ctxs_ will contain all columns of the table
in order.
+ // For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns
that the op
+ // applies to, i.e. columns explicitly mentioned in the query, and
+ // referenced_columns is then used to map to actual column positions.
int col = kudu_table_sink_.referenced_columns.empty() ?
j : kudu_table_sink_.referenced_columns[j];
void* value = output_expr_ctxs_[j]->GetValue(current_row);
- // If the value is NULL and no explicit column references are provided,
the column
- // should be ignored, else it's explicitly set to NULL.
+ // If the value is NULL, we only need to explicitly set it for UPDATE
and UPSERT.
+ // For INSERT, it can be ignored as unspecified cols will be implicitly
set to NULL.
if (value == NULL) {
- if (!kudu_table_sink_.referenced_columns.empty()) {
+ if (sink_action_ == TSinkAction::UPDATE || sink_action_ ==
TSinkAction::UPSERT) {
+ DCHECK(!kudu_table_sink_.referenced_columns.empty());
KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),
"Could not add Kudu WriteOp.");
+ } else {
+ DCHECK(kudu_table_sink_.referenced_columns.empty());
}
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 83c63b7..26f74cf 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -33,6 +33,7 @@ enum TDataSinkType {
enum TSinkAction {
INSERT,
UPDATE,
+ UPSERT,
DELETE
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 836bc2d..c0865c4 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -262,8 +262,8 @@ terminal
KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES,
KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP,
KW_TINYINT,
KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION,
KW_UPDATE,
- KW_UPDATE_FN, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN,
KW_WHERE,
- KW_WITH;
+ KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW,
KW_WHEN,
+ KW_WHERE, KW_WITH;
terminal COLON, SEMICOLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN,
LBRACKET,
RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
@@ -361,7 +361,7 @@ nonterminal ArrayList<String> opt_plan_hints;
nonterminal TypeDef type_def;
nonterminal Type type;
nonterminal Expr sign_chain_expr;
-nonterminal InsertStmt insert_stmt;
+nonterminal InsertStmt insert_stmt, upsert_stmt;
nonterminal UpdateStmt update_stmt;
nonterminal DeleteStmt delete_stmt;
nonterminal ArrayList<Pair<SlotRef, Expr>> update_set_expr_list;
@@ -521,6 +521,8 @@ stmt ::=
{: RESULT = insert; :}
| update_stmt:update
{: RESULT = update; :}
+ | upsert_stmt:upsert
+ {: RESULT = upsert; :}
| delete_stmt:delete
{: RESULT = delete; :}
| use_stmt:use
@@ -659,6 +661,11 @@ explain_stmt ::=
update.setIsExplain();
RESULT = update;
:}
+ | KW_EXPLAIN upsert_stmt:upsert
+ {:
+ upsert.setIsExplain();
+ RESULT = upsert;
+ :}
| KW_EXPLAIN delete_stmt:delete
{:
delete.setIsExplain();
@@ -673,19 +680,29 @@ insert_stmt ::=
opt_with_clause:w KW_INSERT KW_OVERWRITE opt_ignore:ignore opt_kw_table
table_name:table
LPAREN opt_ident_list:col_perm RPAREN partition_clause:list
opt_plan_hints:hints
opt_query_stmt:query
- {: RESULT = new InsertStmt(w, table, true, list, hints, query, col_perm,
ignore); :}
+ {:
+ RESULT = InsertStmt.createInsert(w, table, true, list, hints, query,
col_perm,
+ ignore);
+ :}
| opt_with_clause:w KW_INSERT KW_OVERWRITE
opt_ignore:ignore opt_kw_table table_name:table
partition_clause:list opt_plan_hints:hints query_stmt:query
- {: RESULT = new InsertStmt(w, table, true, list, hints, query, null,
ignore); :}
+ {:
+ RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, null,
ignore);
+ :}
| opt_with_clause:w KW_INSERT opt_ignore:ignore KW_INTO
opt_kw_table table_name:table LPAREN opt_ident_list:col_perm RPAREN
partition_clause:list opt_plan_hints:hints opt_query_stmt:query
- {: RESULT = new InsertStmt(w, table, false, list, hints, query, col_perm,
ignore); :}
+ {:
+ RESULT = InsertStmt.createInsert(w, table, false, list, hints, query,
col_perm,
+ ignore);
+ :}
| opt_with_clause:w KW_INSERT
opt_ignore:ignore KW_INTO opt_kw_table table_name:table partition_clause:list
opt_plan_hints:hints query_stmt:query
- {: RESULT = new InsertStmt(w, table, false, list, hints, query, null,
ignore); :}
+ {:
+ RESULT = InsertStmt.createInsert(w, table, false, list, hints, query,
null, ignore);
+ :}
;
// Update statements have an optional WHERE and optional FROM clause.
@@ -717,6 +734,17 @@ update_set_expr_list ::=
:}
;
+// Upsert statements have an optional column permutation clause. If the column
permutation
+// is present, the query statement clause is optional as well.
+upsert_stmt ::=
+ opt_with_clause:w KW_UPSERT KW_INTO opt_kw_table table_name:table
+ LPAREN opt_ident_list:col_perm RPAREN opt_plan_hints:hints
opt_query_stmt:query
+ {: RESULT = InsertStmt.createUpsert(w, table, hints, query, col_perm); :}
+ | opt_with_clause:w KW_UPSERT KW_INTO opt_kw_table table_name:table
+ opt_plan_hints:hints query_stmt:query
+ {: RESULT = InsertStmt.createUpsert(w, table, hints, query, null); :}
+ ;
+
// A DELETE statement comes in two main representations, the DELETE keyword
with a path
// specification as the target table with an optional FROM keyword or the
DELETE
// keyword followed by a table alias or reference and a full FROM clause. In
all cases
@@ -3217,6 +3245,8 @@ ident_or_keyword ::=
{: RESULT = r.toString(); :}
| KW_UPDATE_FN:r
{: RESULT = r.toString(); :}
+ | KW_UPSERT:r
+ {: RESULT = r.toString(); :}
| KW_USE:r
{: RESULT = r.toString(); :}
| KW_USING:r
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git
a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 2f5f166..2eb399f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -80,8 +80,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
pkvs.add(new PartitionKeyValue(key, null));
}
}
- insertStmt_ = new InsertStmt(null, createStmt.getTblName(), false, pkvs,
- null, queryStmt, null, false);
+ insertStmt_ = InsertStmt.createInsert(
+ null, createStmt.getTblName(), false, pkvs, null, queryStmt, null,
false);
}
public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 5e457a3..da360b1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
- * Representation of a single insert statement, including the select statement
+ * Representation of a single insert or upsert statement, including the select
statement
* whose results are to be inserted.
*/
public class InsertStmt extends StatementBase {
@@ -70,20 +70,22 @@ public class InsertStmt extends StatementBase {
// auto-generate one (for insert into tbl()) during analysis.
private final boolean needsGeneratedQueryStatement_;
- // The column permutation is specified by writing INSERT INTO tbl(col3,
col1, col2...)
+ // The column permutation is specified by writing:
+ // (INSERT|UPSERT) INTO tbl(col3, col1, col2...)
//
// It is a mapping from select-list expr index to (non-partition) output
column. If
// null, will be set to the default permutation of all non-partition columns
in Hive
- // order.
+ // order or all columns for Kudu tables.
//
// A column is said to be 'mentioned' if it occurs either in the column
permutation, or
// the PARTITION clause. If columnPermutation is null, all non-partition
columns are
// considered mentioned.
//
- // Between them, the columnPermutation and the set of partitionKeyValues
must mention to
+ // Between them, the columnPermutation and the set of partitionKeyValues
must mention
// every partition column in the target table exactly once. Other columns,
if not
- // explicitly mentioned, will be assigned NULL values. Partition columns are
not
- // defaulted to NULL by design, and are not just for NULL-valued partition
slots.
+ // explicitly mentioned, will be assigned NULL values for INSERTs and left
unassigned
+ // for UPSERTs. Partition columns are not defaulted to NULL by design, and
are not just
+ // for NULL-valued partition slots.
//
// Dynamic partition keys may occur in either the permutation or the
PARTITION
// clause. Partition columns with static values may only be mentioned in the
PARTITION
@@ -123,11 +125,20 @@ public class InsertStmt extends StatementBase {
private boolean hasClusteredHint_ = false;
// Output expressions that produce the final results to write to the target
table. May
- // include casts, and NullLiterals where an output column isn't explicitly
mentioned.
- // Set in prepareExpressions(). The i'th expr produces the i'th column of
the target
+ // include casts. Set in prepareExpressions().
+ // If this is an INSERT, will contain one Expr for all non-partition columns
of the
+ // target table with NullLiterals where an output column isn't explicitly
mentioned.
+ // The i'th expr produces the i'th column of the target table.
+ // If this is an UPSERT, will contain one Expr per column mentioned in the
query and
+ // mentionedUpsertColumns_ is used to map between the Exprs and columns in
the target
// table.
private ArrayList<Expr> resultExprs_ = Lists.newArrayList();
+ // Position mapping of exprs in resultExprs_ to columns in the target table -
+ // resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the
target table.
+ // Only used for UPSERT, set in prepareExpressions().
+ private final List<Integer> mentionedUpsertColumns_ = Lists.newArrayList();
+
// Set in analyze(). Exprs corresponding to key columns of Kudu tables.
Empty for
// non-Kudu tables.
private ArrayList<Expr> primaryKeyExprs_ = Lists.newArrayList();
@@ -138,9 +149,29 @@ public class InsertStmt extends StatementBase {
// For tables with primary keys, indicates if duplicate key errors are
ignored.
private final boolean ignoreDuplicates_;
- public InsertStmt(WithClause withClause, TableName targetTable, boolean
overwrite,
+ // True iff this is an UPSERT operation. Only supported for Kudu tables.
+ private final boolean isUpsert_;
+
+ public static InsertStmt createInsert(WithClause withClause, TableName
targetTable,
+ boolean overwrite, List<PartitionKeyValue> partitionKeyValues,
+ List<String> planHints, QueryStmt queryStmt, List<String>
columnPermutation,
+ boolean ignoreDuplicates) {
+ return new InsertStmt(withClause, targetTable, overwrite,
partitionKeyValues,
+ planHints, queryStmt, columnPermutation, ignoreDuplicates, false);
+ }
+
+ public static InsertStmt createUpsert(WithClause withClause, TableName
targetTable,
+ List<String> planHints, QueryStmt queryStmt, List<String>
columnPermutation) {
+ return new InsertStmt(withClause, targetTable, false, null, planHints,
queryStmt,
+ columnPermutation, false, true);
+ }
+
+ protected InsertStmt(WithClause withClause, TableName targetTable, boolean
overwrite,
List<PartitionKeyValue> partitionKeyValues, List<String> planHints,
- QueryStmt queryStmt, List<String> columnPermutation, boolean
ignoreDuplicates) {
+ QueryStmt queryStmt, List<String> columnPermutation, boolean
ignoreDuplicates,
+ boolean isUpsert) {
+ Preconditions.checkState(!isUpsert || (!overwrite && !ignoreDuplicates &&
+ partitionKeyValues == null));
withClause_ = withClause;
targetTableName_ = targetTable;
originalTableName_ = targetTableName_;
@@ -152,6 +183,7 @@ public class InsertStmt extends StatementBase {
columnPermutation_ = columnPermutation;
table_ = null;
ignoreDuplicates_ = ignoreDuplicates;
+ isUpsert_ = isUpsert;
}
/**
@@ -170,6 +202,7 @@ public class InsertStmt extends StatementBase {
columnPermutation_ = other.columnPermutation_;
table_ = other.table_;
ignoreDuplicates_ = other.ignoreDuplicates_;
+ isUpsert_ = other.isUpsert_;
}
@Override
@@ -184,6 +217,7 @@ public class InsertStmt extends StatementBase {
hasNoShuffleHint_ = false;
hasClusteredHint_ = false;
resultExprs_.clear();
+ mentionedUpsertColumns_.clear();
primaryKeyExprs_.clear();
}
@@ -223,7 +257,7 @@ public class InsertStmt extends StatementBase {
// Set target table and perform table-type specific analysis and auth
checking.
// Also checks if the target table is missing.
- setTargetTable(analyzer);
+ analyzeTargetTable(analyzer);
// Abort analysis if there are any missing tables beyond this point.
if (!analyzer.getMissingTbls().isEmpty()) {
@@ -251,7 +285,8 @@ public class InsertStmt extends StatementBase {
// Finally, prepareExpressions analyzes the expressions themselves, and
confirms that
// they are type-compatible with the target columns. Where columns are not
mentioned
// (and by this point, we know that missing columns are not partition
columns),
- // prepareExpressions assigns them a NULL literal expressions.
+ // prepareExpressions assigns them a NULL literal expressions, unless this
is an
+ // UPSERT, in which case we don't want to overwrite unmentioned columns
with NULL.
// An null permutation clause is the same as listing all non-partition
columns in
// order.
@@ -332,13 +367,12 @@ public class InsertStmt extends StatementBase {
/**
* Sets table_ based on targetTableName_ and performs table-type specific
analysis:
- * - Partition clause is invalid for unpartitioned Hdfs tables and HBase
tables
- * - Overwrite is invalid for HBase tables
- * - Check INSERT privileges as well as write access to Hdfs paths
- * - Cannot insert into a view
+ * - Cannot (in|up)sert into a view
+ * - Cannot (in|up)sert into a table with unsupported column types
+ * - Analysis specific to insert and upsert operations
* Adds table_ to the analyzer's descriptor table if analysis succeeds.
*/
- private void setTargetTable(Analyzer analyzer) throws AnalysisException {
+ private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
// If the table has not yet been set, load it from the Catalog. This
allows for
// callers to set a table to analyze that may not actually be created in
the Catalog.
// One example use case is CREATE TABLE AS SELECT which must run analysis
on the
@@ -356,21 +390,42 @@ public class InsertStmt extends StatementBase {
.allOf(Privilege.INSERT).toRequest());
}
- // We do not support inserting into views.
+ // We do not support (in|up)serting into views.
if (table_ instanceof View) {
throw new AnalysisException(
- String.format("Impala does not support inserting into views: %s",
- table_.getFullName()));
+ String.format("Impala does not support %sing into views: %s",
getOpName(),
+ table_.getFullName()));
}
+ // We do not support (in|up)serting into tables with unsupported column
types.
for (Column c: table_.getColumns()) {
if (!c.getType().isSupported()) {
- throw new AnalysisException(String.format("Unable to INSERT into
target table " +
+ throw new AnalysisException(String.format("Unable to %s into target
table " +
"(%s) because the column '%s' has an unsupported type '%s'.",
- targetTableName_, c.getName(), c.getType().toSql()));
+ getOpName(), targetTableName_, c.getName(), c.getType().toSql()));
}
}
+ // Perform operation-specific analysis.
+ if (isUpsert_) {
+ if (!(table_ instanceof KuduTable)) {
+ throw new AnalysisException("UPSERT is only supported for Kudu
tables");
+ }
+ } else {
+ analyzeTableForInsert(analyzer);
+ }
+
+ // Add target table to descriptor table.
+ analyzer.getDescTbl().setTargetTable(table_);
+ }
+
+ /**
+ * Performs INSERT-specific table analysis:
+ * - Partition clause is invalid for unpartitioned or HBase tables
+ * - Check INSERT privileges as well as write access to Hdfs paths
+ * - Overwrite is invalid for HBase and Kudu tables
+ */
+ private void analyzeTableForInsert(Analyzer analyzer) throws
AnalysisException {
boolean isHBaseTable = (table_ instanceof HBaseTable);
int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
@@ -433,48 +488,28 @@ public class InsertStmt extends StatementBase {
if (isHBaseTable && overwrite_) {
throw new AnalysisException("HBase doesn't have a way to perform INSERT
OVERWRITE");
}
-
- // Add target table to descriptor table.
- analyzer.getDescTbl().setTargetTable(table_);
}
/**
- * Checks that the column permutation + select list + static partition exprs
+
- * dynamic partition exprs collectively cover exactly all columns in the
target table
- * (not more of fewer).
+ * Checks that the column permutation + select list + static partition exprs
+ dynamic
+ * partition exprs collectively cover exactly all required columns in the
target table,
+ * depending on the table type.
*/
private void checkColumnCoverage(ArrayList<Column> selectExprTargetColumns,
Set<String> mentionedColumnNames, int numSelectListExprs,
int numStaticPartitionExprs) throws AnalysisException {
- boolean isHBaseTable = (table_ instanceof HBaseTable);
- int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
- // Check that all columns are mentioned by the permutation and partition
clauses
+ // Check that all required cols are mentioned by the permutation and
partition clauses
if (selectExprTargetColumns.size() + numStaticPartitionExprs !=
table_.getColumns().size()) {
// We've already ruled out too many columns in the permutation and
partition clauses
// by checking that there are no duplicates and that every column
mentioned actually
- // exists. So all columns aren't mentioned in the query. If the
unmentioned columns
- // include partition columns, this is an error.
- List<String> missingColumnNames = Lists.newArrayList();
- for (Column column: table_.getColumns()) {
- if (!mentionedColumnNames.contains(column.getName())) {
- // HBase tables have a single row-key column which is always in
position 0. It
- // must be mentioned, since it is invalid to set it to NULL (which
would
- // otherwise happen by default).
- if (isHBaseTable && column.getPosition() == 0) {
- throw new AnalysisException("Row-key column '" + column.getName() +
- "' must be explicitly mentioned in column permutation.");
- }
- if (column.getPosition() < numClusteringCols) {
- missingColumnNames.add(column.getName());
- }
- }
- }
-
- if (!missingColumnNames.isEmpty()) {
- throw new AnalysisException(
- "Not enough partition columns mentioned in query. Missing columns
are: " +
- Joiner.on(", ").join(missingColumnNames));
+ // exists. So all columns aren't mentioned in the query.
+ if (table_ instanceof KuduTable) {
+ checkRequiredKuduColumns(mentionedColumnNames);
+ } else if (table_ instanceof HBaseTable) {
+ checkRequiredHBaseColumns(mentionedColumnNames);
+ } else if (table_.getNumClusteringCols() > 0) {
+ checkRequiredPartitionedColumns(mentionedColumnNames);
}
}
@@ -507,6 +542,66 @@ public class InsertStmt extends StatementBase {
}
/**
+ * For a Kudu table, checks that all key columns are mentioned.
+ */
+ private void checkRequiredKuduColumns(Set<String> mentionedColumnNames)
+ throws AnalysisException {
+ Preconditions.checkState(table_ instanceof KuduTable);
+ List<String> keyColumns = ((KuduTable) table_).getPrimaryKeyColumnNames();
+ List<String> missingKeyColumnNames = Lists.newArrayList();
+ for (Column column : table_.getColumns()) {
+ if (!mentionedColumnNames.contains(column.getName())
+ && keyColumns.contains(column.getName())) {
+ missingKeyColumnNames.add(column.getName());
+ }
+ }
+
+ if (!missingKeyColumnNames.isEmpty()) {
+ throw new AnalysisException(String.format(
+ "All primary key columns must be specified for %sing into Kudu
tables. " +
+ "Missing columns are: %s", getOpName(),
+ Joiner.on(", ").join(missingKeyColumnNames)));
+ }
+ }
+
+ /**
+ * For an HBase table, checks that the row-key column is mentioned.
+ * HBase tables have a single row-key column which is always in position 0.
It
+ * must be mentioned, since it is invalid to set it to NULL (which would
+ * otherwise happen by default).
+ */
+ private void checkRequiredHBaseColumns(Set<String> mentionedColumnNames)
+ throws AnalysisException {
+ Preconditions.checkState(table_ instanceof HBaseTable);
+ Column column = table_.getColumns().get(0);
+ if (!mentionedColumnNames.contains(column.getName())) {
+ throw new AnalysisException("Row-key column '" + column.getName() +
+ "' must be explicitly mentioned in column permutation.");
+ }
+ }
+
+ /**
+ * For partitioned tables, checks that all partition columns are mentioned.
+ */
+ private void checkRequiredPartitionedColumns(Set<String>
mentionedColumnNames)
+ throws AnalysisException {
+ int numClusteringCols = table_.getNumClusteringCols();
+ List<String> missingPartitionColumnNames = Lists.newArrayList();
+ for (Column column : table_.getColumns()) {
+ if (!mentionedColumnNames.contains(column.getName())
+ && column.getPosition() < numClusteringCols) {
+ missingPartitionColumnNames.add(column.getName());
+ }
+ }
+
+ if (!missingPartitionColumnNames.isEmpty()) {
+ throw new AnalysisException(
+ "Not enough partition columns mentioned in query. Missing columns
are: " +
+ Joiner.on(", ").join(missingPartitionColumnNames));
+ }
+ }
+
+ /**
* Performs four final parts of the analysis:
* 1. Checks type compatibility between all expressions and their targets
*
@@ -515,7 +610,7 @@ public class InsertStmt extends StatementBase {
*
* 3. Populates resultExprs_ with type-compatible expressions, in Hive
column order,
* for all expressions in the select-list. Unmentioned columns are assigned
NULL literal
- * expressions.
+ * expressions, unless this is an UPSERT.
*
* 4. Result exprs for key columns of Kudu tables are stored in
primaryKeyExprs_.
*
@@ -581,21 +676,24 @@ public class InsertStmt extends StatementBase {
}
// Finally, 'undo' the permutation so that the selectListExprs are in Hive
column
- // order, and add NULL expressions to all missing columns.
- for (Column tblColumn: table_.getColumnsInHiveOrder()) {
+ // order, and add NULL expressions to all missing columns, unless this is
an UPSERT.
+ ArrayList<Column> columns = table_.getColumnsInHiveOrder();
+ for (int col = 0; col < columns.size(); ++col) {
+ Column tblColumn = columns.get(col);
boolean matchFound = false;
for (int i = 0; i < selectListExprs.size(); ++i) {
if
(selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
resultExprs_.add(selectListExprs.get(i));
+ if (isUpsert_) mentionedUpsertColumns_.add(col);
matchFound = true;
break;
}
}
// If no match is found, either the column is a clustering column with a
static
// value, or it was unmentioned and therefore should have a NULL
select-list
- // expression.
+ // expression if this is an INSERT.
if (!matchFound) {
- if (tblColumn.getPosition() >= numClusteringCols) {
+ if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) {
// Unmentioned non-clustering columns get NULL literals with the
appropriate
// target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
resultExprs_.add(NullLiteral.create(tblColumn.getType()));
@@ -666,6 +764,8 @@ public class InsertStmt extends StatementBase {
queryStmt_.rewriteExprs(rewriter);
}
+ private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; }
+
public List<String> getPlanHints() { return planHints_; }
public TableName getTargetTableName() { return targetTableName_; }
public Table getTargetTable() { return table_; }
@@ -687,8 +787,9 @@ public class InsertStmt extends StatementBase {
public DataSink createDataSink() {
// analyze() must have been called before.
Preconditions.checkState(table_ != null);
- return TableSink.create(table_, TableSink.Op.INSERT, partitionKeyExprs_,
- ImmutableList.<Integer>of(), overwrite_, ignoreDuplicates_);
+ Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty());
+ return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT :
TableSink.Op.INSERT,
+ partitionKeyExprs_, mentionedUpsertColumns_, overwrite_,
ignoreDuplicates_);
}
/**
@@ -708,7 +809,7 @@ public class InsertStmt extends StatementBase {
if (withClause_ != null) strBuilder.append(withClause_.toSql() + " ");
- strBuilder.append("INSERT ");
+ strBuilder.append(getOpName() + " ");
if (overwrite_) {
strBuilder.append("OVERWRITE ");
} else {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index 3d98aca..0a99af4 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -58,14 +58,16 @@ public class KuduTableSink extends TableSink {
StringBuilder output = new StringBuilder();
output.append(prefix + sinkOp_.toExplainString());
output.append(" KUDU [" + targetTable_.getFullName() + "]\n");
- output.append(detailPrefix);
- if (sinkOp_ == Op.INSERT) {
- output.append("check unique keys: ");
- } else {
- output.append("check keys exist: ");
+ if (sinkOp_ != Op.UPSERT) {
+ output.append(detailPrefix);
+ if (sinkOp_ == Op.INSERT) {
+ output.append("check unique keys: ");
+ } else {
+ output.append("check keys exist: ");
+ }
+ output.append(ignoreNotFoundOrDuplicate_);
+ output.append("\n");
}
- output.append(ignoreNotFoundOrDuplicate_);
- output.append("\n");
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(PrintUtils.printHosts(detailPrefix,
fragment_.getNumNodes()));
output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java
b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 788bb50..102b280 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -51,6 +51,13 @@ public abstract class TableSink extends DataSink {
@Override
public TSinkAction toThrift() { return TSinkAction.UPDATE; }
},
+ UPSERT {
+ @Override
+ public String toExplainString() { return "UPSERT INTO"; }
+
+ @Override
+ public TSinkAction toThrift() { return TSinkAction.UPSERT; }
+ },
DELETE {
@Override
public String toExplainString() { return "DELETE FROM"; }
@@ -105,6 +112,8 @@ public abstract class TableSink extends DataSink {
Preconditions.checkState(overwrite == false);
// Partition clauses don't make sense for Kudu inserts.
Preconditions.checkState(partitionKeyExprs.isEmpty());
+ // UPSERT is incompatible with ignoreDuplicates.
+ Preconditions.checkState(sinkAction != Op.UPSERT || !ignoreDuplicates);
return new KuduTableSink(table, sinkAction, referencedColumns,
ignoreDuplicates);
} else {
throw new UnsupportedOperationException(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex
b/fe/src/main/jflex/sql-scanner.flex
index def0be2..099ceda 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -228,6 +228,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
keywordMap.put("union", new Integer(SqlParserSymbols.KW_UNION));
keywordMap.put("update", new Integer(SqlParserSymbols.KW_UPDATE));
keywordMap.put("update_fn", new Integer(SqlParserSymbols.KW_UPDATE_FN));
+ keywordMap.put("upsert", new Integer(SqlParserSymbols.KW_UPSERT));
keywordMap.put("use", new Integer(SqlParserSymbols.KW_USE));
keywordMap.put("using", new Integer(SqlParserSymbols.KW_USING));
keywordMap.put("values", new Integer(SqlParserSymbols.KW_VALUES));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index cbd2d07..a6760de 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -30,6 +30,8 @@ import org.apache.impala.common.AnalysisException;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.testutil.TestUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -2462,6 +2464,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
"(1, true, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a', cast(0 as timestamp), 2009,
10)," +
"(2, false, 2, 2, NULL, 2, 2.0, 2.0, 'b', 'b', cast(0 as timestamp),
2009, 2)," +
"(3, true, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c', cast(0 as timestamp), 2009,
3))");
+
// Test multiple aliases. Values() is like union, the column labels are
'x' and 'y'.
AnalyzesOk("values((1 as x, 'a' as y), (2 as k, 'b' as j))");
// Test order by, offset and limit.
@@ -2820,7 +2823,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
// Cannot insert into a view.
AnalysisError("insert into functional.alltypes_view partition(year, month)
" +
"select * from functional.alltypes",
- "Impala does not support inserting into views:
functional.alltypes_view");
+ "Impala does not support INSERTing into views:
functional.alltypes_view");
// Cannot load into a view.
AnalysisError("load data inpath
'/test-warehouse/tpch.lineitem/lineitem.tbl' " +
"into table functional.alltypes_view",
@@ -2978,6 +2981,13 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
"'b.int_array_col' correlated with an outer block as well as an " +
"uncorrelated one 'functional.alltypestiny':\n" +
"SELECT item FROM b.int_array_col, functional.alltypestiny");
+
+ if (RuntimeEnv.INSTANCE.isKuduSupported()) {
+ // Key columns missing from permutation
+ AnalysisError("insert into functional_kudu.testtbl(zip) values(1)",
+ "All primary key columns must be specified for INSERTing into Kudu
tables. " +
+ "Missing columns are: id");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
----------------------------------------------------------------------
diff --git
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index fb2d63a..329be5c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -1163,6 +1163,14 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
"select * from functional.alltypestiny where id = (select 1) " +
"union select * from functional.alltypestiny where id = (select 2)");
+ // UPSERT
+ AnalyzesOk("upsert into functional_kudu.testtbl select * from " +
+ "functional_kudu.testtbl where id in (select id from
functional_kudu.testtbl " +
+ "where zip = 0)");
+ AnalyzesOk("upsert into functional_kudu.testtbl select * from " +
+ "functional_kudu.testtbl union select bigint_col, string_col, int_col
from " +
+ "functional.alltypes");
+
// CTAS with correlated subqueries
AnalyzesOk("create table functional.test_tbl as select * from " +
"functional.alltypes t where t.id in (select id from
functional.alltypesagg " +
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
----------------------------------------------------------------------
diff --git
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
new file mode 100644
index 0000000..118b322
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.junit.Test;
+
+import org.apache.impala.testutil.TestUtils;
+
+public class AnalyzeUpsertStmtTest extends AnalyzerTest {
+ @Test
+ public void TestUpsert() {
+ TestUtils.assumeKuduIsSupported();
+ // VALUES clause
+ AnalyzesOk("upsert into table functional_kudu.testtbl values(1, 'a', 1)");
+ AnalyzesOk("upsert into table functional_kudu.testtbl(id) values(1)");
+ AnalyzesOk("upsert into table functional_kudu.testtbl values(1, 'a', 1), "
+
+ "(2, 'b', 2), (3, 'c', 3)");
+ // SELECT clause
+ AnalyzesOk("upsert into functional_kudu.testtbl select bigint_col,
string_col, " +
+ "int_col from functional.alltypes");
+ // Permutation lists
+ AnalyzesOk("upsert into table functional_kudu.testtbl(id) select
bigint_col " +
+ "from functional.alltypes");
+ AnalyzesOk("upsert into table functional_kudu.testtbl(id, name) select
bigint_col, " +
+ "string_col from functional.alltypes");
+ AnalyzesOk("upsert into table functional_kudu.testtbl(name, zip, id)
select " +
+ "string_col, int_col, bigint_col from functional.alltypes");
+ // WITH clause
+ AnalyzesOk("with t1 as (select 1, 'a', 2) upsert into
functional_kudu.testtbl " +
+ "select * from t1");
+ AnalyzesOk("with t1 as (select * from functional.alltypes) upsert into " +
+ "functional_kudu.testtbl select bigint_col, string_col, int_col from
t1");
+ // WITH belonging to the select clause
+ AnalyzesOk("upsert into functional_kudu.testtbl with t1 as (select * from
" +
+ "functional.alltypes) select bigint_col, string_col, int_col from t1");
+ AnalyzesOk("upsert into functional_kudu.testtbl(id) with t1 as (select *
from " +
+ "functional.alltypes) select bigint_col from t1");
+ // Multiple WITH clauses
+ AnalyzesOk("with t1 as (select * from functional.alltypestiny) " +
+ "upsert into functional_kudu.testtbl with t2 as (select * from " +
+ "functional.alltypessmall) select bigint_col, string_col, int_col from
t1");
+ // Correlated inline view
+ AnalyzesOk("upsert into table functional_kudu.testtbl " +
+ "select a.id, string_col, b.month " +
+ "from functional.alltypes a, functional.allcomplextypes b, " +
+ "(select item from b.int_array_col) v1 " +
+ "where a.id = b.id");
+ // Hint
+ AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select *
from " +
+ "functional_kudu.testtbl");
+ // Incorrect hint, results in warning
+ AnalyzesOk("upsert into table functional_kudu.testtbl [badhint] select *
from " +
+ "functional_kudu.testtbl", "INSERT hint not recognized: badhint");
+
+ // Key columns missing from permutation
+ AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)",
+ "All primary key columns must be specified for UPSERTing into Kudu
tables. " +
+ "Missing columns are: id");
+ // SELECT clause with wrong number of columns
+ AnalysisError("upsert into functional_kudu.testtbl select * from
functional.alltypes",
+ "Target table 'functional_kudu.testtbl' has fewer columns (3) than the
SELECT " +
+ "/ VALUES clause returns (13)");
+ // VALUES clause with wrong number of columns
+ AnalysisError("upsert into functional_kudu.testtbl values(1)", "Target
table " +
+ "'functional_kudu.testtbl' has more columns (3) than the SELECT /
VALUES " +
+ "clause returns (1)");
+ // Permutation with wrong number of columns
+ AnalysisError("upsert into functional_kudu.testtbl(id, name, zip)
values(1)",
+ "Column permutation mentions more columns (3) than the SELECT / VALUES
" +
+ "clause returns (1)");
+ // Type mismatch
+ AnalysisError("upsert into functional_kudu.testtbl values(1, 1, 1)",
+ "Target table 'functional_kudu.testtbl' is incompatible with source " +
+ "expressions.\nExpression '1' (type: TINYINT) is not compatible with
column " +
+ "'name' (type: STRING)");
+ // Permutation with type mismatch
+ AnalysisError("upsert into functional_kudu.testtbl(zip, id, name) " +
+ "values('a', 'a', 'a')", "Target table 'functional_kudu.testtbl' is " +
+ "incompatible with source expressions.\nExpression ''a'' (type:
STRING) is not " +
+ "compatible with column 'zip' (type: INT)");
+ // Permutation with invalid column name
+ AnalysisError("upsert into functional_kudu.testtbl (id, name, invalid)
values " +
+ "(1, 'a', 1)", "Unknown column 'invalid' in column permutation");
+ // Permutation with repeated column
+ AnalysisError("upsert into functional_kudu.testtbl (id, name, zip, id)
values " +
+ "(1, 'a', 1, 1)", "Duplicate column 'id' in column permutation");
+ // UPSERT into non-Kudu table
+ AnalysisError("upsert into functional.alltypes select * from
functional.alltypes",
+ "UPSERT is only supported for Kudu tables");
+ // Unknown target DB
+ AnalysisError("upsert into UNKNOWNDB.testtbl select * " +
+ "from functional.alltypesnopart", "Database does not exist:
UNKNOWNDB");
+ // WITH-clause tables cannot be upserted into
+ AnalysisError("with t1 as (select 'a' x) upsert into t1 values('b' x)",
+ "Table does not exist: default.t1");
+ // Cannot upsert into a view
+ AnalysisError("upsert into functional.alltypes_view select * from " +
+ "functional.alltypes",
+ "Impala does not support UPSERTing into views:
functional.alltypes_view");
+ // Upsert with uncorrelated inline view
+ AnalysisError("upsert into table functional_kudu.testtbl " +
+ "select a.id, a.string_col, b.month " +
+ "from functional.alltypes a, functional.allcomplextypes b, " +
+ "(select item from b.int_array_col, functional.alltypestiny) v1 " +
+ "where a.id = b.id",
+ "Nested query is illegal because it contains a table reference " +
+ "'b.int_array_col' correlated with an outer block as well as an " +
+ "uncorrelated one 'functional.alltypestiny':\n" +
+ "SELECT item FROM b.int_array_col, functional.alltypestiny");
+ // Illegal complex-typed expr
+ AnalysisError("upsert into functional_kudu.testtbl " +
+ "select int_struct_col from functional.allcomplextypes",
+ "Expr 'int_struct_col' in select list returns a " +
+ "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
+ "Only scalar types are allowed in the select list.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 993f489..b9d1595 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -428,6 +428,10 @@ public class AnalyzerTest extends FrontendTestBase {
"select id from (select id+2 from functional_hbase.alltypessmall) a",
"Could not resolve column/field reference: 'id'");
+ // Analysis error from explain upsert
+ AnalysisError("explain upsert into table functional.alltypes select * from
" +
+ "functional.alltypes", "UPSERT is only supported for Kudu tables");
+
// Positive test for explain query
AnalyzesOk("explain select * from functional.AllTypes");
@@ -437,6 +441,10 @@ public class AnalyzerTest extends FrontendTestBase {
"select id, bool_col, tinyint_col, smallint_col, int_col, int_col, " +
"float_col, float_col, date_string_col, string_col, timestamp_col " +
"from functional.alltypes");
+
+ // Positive test for explain upsert
+ AnalyzesOk("explain upsert into table functional_kudu.testtbl select *
from " +
+ "functional_kudu.testtbl");
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index c163d70..092e146 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -393,6 +393,12 @@ public class ParserTest {
"insert overwrite t(a, b) partition(x, y) %sfoo,bar,baz%s select *
from t",
prefix, suffix), "foo", "bar", "baz");
+ // Test upsert hints.
+ ParsesOk(String.format("upsert into t %sshuffle%s select * from t",
prefix,
+ suffix));
+ ParsesOk(String.format("upsert into t (x, y) %sshuffle%s select * from
t", prefix,
+ suffix));
+
// Test TableRef hints.
TestTableHints(String.format(
"select * from functional.alltypes %sschedule_disk_local%s", prefix,
suffix),
@@ -761,6 +767,11 @@ public class ParserTest {
ParsesOk("insert overwrite table t select a from test union select a from
test " +
"union select a from test union select a from test");
+ // Union in upsert query.
+ ParsesOk("upsert into table t select a from test union select a from
test");
+ ParsesOk("upsert into table t select a from test union select a from test
" +
+ "union select a from test union select a from test");
+
// No complete select statement on lhs.
ParserError("a from test union select a from test");
// No complete select statement on rhs.
@@ -777,12 +788,14 @@ public class ParserTest {
ParsesOk("select * from (values(1, 'a', abc, 1.0, *)) as t");
ParsesOk("values(1, 'a', abc, 1.0, *) union all values(1, 'a', abc, 1.0,
*)");
ParsesOk("insert into t values(1, 'a', abc, 1.0, *)");
+ ParsesOk("upsert into t values(1, 'a', abc, 1.0, *)");
// Values stmt with multiple rows.
ParsesOk("values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
ParsesOk("select * from (values(1, abc), ('x', cde), (2), (efg, fgh, ghi))
as t");
ParsesOk("values(1, abc), ('x', cde), (2), (efg, fgh, ghi) " +
"union all values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
ParsesOk("insert into t values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
+ ParsesOk("upsert into t values(1, abc), ('x', cde), (2), (efg, fgh, ghi)");
// Test additional parenthesis.
ParsesOk("(values(1, abc), ('x', cde), (2), (efg, fgh, ghi))");
ParsesOk("values((1, abc), ('x', cde), (2), (efg, fgh, ghi))");
@@ -838,6 +851,14 @@ public class ParserTest {
// With clause before insert statement.
ParsesOk("with t as (select 1) insert into x select * from t");
ParsesOk("with t(x) as (select 1) insert into x select * from t");
+ // With clause in query statement of upsert statement.
+ ParsesOk("upsert into x with t as (select * from tab) select * from t");
+ ParsesOk("upsert into x with t(x, y) as (select * from tab) select * from
t");
+ ParsesOk("upsert into x with t as (values(1, 2, 3)) select * from t");
+ ParsesOk("upsert into x with t(x, y) as (values(1, 2, 3)) select * from
t");
+ // With clause before upsert statement.
+ ParsesOk("with t as (select 1) upsert into x select * from t");
+ ParsesOk("with t(x) as (select 1) upsert into x select * from t");
// Test quoted identifier or string literal as table alias.
ParsesOk("with `t1` as (select 1 a), 't2' as (select 2 a), \"t3\" as
(select 3 a)" +
@@ -854,6 +875,10 @@ public class ParserTest {
ParsesOk("with t as (select 1) insert into x with t as (select 2) select *
from t");
ParsesOk("with t(c1) as (select 1) " +
"insert into x with t(c2) as (select 2) select * from t");
+ // Multiple with clauses. One before the upsert and one inside the query
statement.
+ ParsesOk("with t as (select 1) upsert into x with t as (select 2) select *
from t");
+ ParsesOk("with t(c1) as (select 1) " +
+ "upsert into x with t(c2) as (select 2) select * from t");
// Empty with clause.
ParserError("with t as () select 1");
@@ -873,6 +898,9 @@ public class ParserTest {
// Insert in with clause is not valid.
ParserError("with t as (insert into x select * from tab) select * from t");
ParserError("with t(c1) as (insert into x select * from tab) select * from
t");
+ // Upsert in with clause is not valid.
+ ParserError("with t as (upsert into x select * from tab) select * from t");
+ ParserError("with t(c1) as (upsert into x select * from tab) select * from
t");
// Union operands need to be parenthesized to have their own with clause.
ParserError("select * from t union all with t as (select 2) select * from
t");
}
@@ -966,7 +994,7 @@ public class ParserTest {
ParsesOk("select a from `abc\u007fabc`");
// Quoted identifiers can contain keywords.
- ParsesOk("select `select`, `insert` from `table` where `where` = 10");
+ ParsesOk("select `select`, `insert`, `upsert` from `table` where `where` =
10");
// Quoted identifiers cannot contain "`"
ParserError("select a from `abcde`abcde`");
@@ -1610,6 +1638,47 @@ public class ParserTest {
}
@Test
+ public void TestUpsert() {
+ for (String optTbl: new String[] {"", "table"}) {
+ // SELECT clause
+ ParsesOk(String.format("upsert into %s t select a from src", optTbl));
+ // VALUES clause
+ ParsesOk(String.format("upsert into %s t values (1, 2, 3)", optTbl));
+ // Permutation
+ ParsesOk(String.format("upsert into %s t (a,b,c) values(1,2,3)",
optTbl));
+ // Permutation with mismatched select list (should parse fine)
+ ParsesOk(String.format("upsert into %s t (a,b,c) values(1,2,3,4,5,6)",
optTbl));
+ // Empty permutation list
+ ParsesOk(String.format("upsert into %s t () select 1 from a", optTbl));
+ // Permutation with optional query statement
+ ParsesOk(String.format("upsert into %s t () ", optTbl));
+ // WITH clause
+ ParsesOk(String.format("with x as (select a from src where b > 5) upsert
into %s " +
+ "t select * from x", optTbl));
+
+ // Missing query statement
+ ParserError(String.format("upsert into %s t", optTbl));
+ // Missing 'into'.
+ ParserError(String.format("upsert %s t select a from src where b > 5",
optTbl));
+ // Missing target table identifier.
+ ParserError(String.format("upsert into %s select a from src where b >
5", optTbl));
+ // No comma in permutation list
+ ParserError(String.format("upsert %s t(a b c) select 1 from a", optTbl));
+ // Can't use strings as identifiers in permutation list
+ ParserError(String.format("upsert %s t('a') select 1 from a", optTbl));
+ // Expressions not allowed in permutation list
+ ParserError(String.format("upsert %s t(a=1, b) select 1 from a",
optTbl));
+ // Upsert doesn't support ignore.
+ ParserError(String.format("upsert ignore into %s t select a from src",
optTbl));
+ // Upsert doesn't support partition clauses.
+ ParserError(String.format(
+ "upsert into %s t partition (pk1=10) select a from src", optTbl));
+ // Upsert doesn't support overwrite.
+ ParserError(String.format("upsert overwrite %s t select 1 from src",
optTbl));
+ }
+ }
+
+ @Test
public void TestUpdate() {
ParsesOk("update t set x = 3 where a < b");
ParsesOk("update t set x = (3 + length(\"hallo\")) where a < 'adasas'");
@@ -2512,6 +2581,7 @@ public class ParserTest {
ParserError("CREATE VIEW Foo.Bar (x) AS");
// Invalid view definitions. A view definition must be a query statement.
ParserError("CREATE VIEW Foo.Bar (x) AS INSERT INTO t select * from t");
+ ParserError("CREATE VIEW Foo.Bar (x) AS UPSERT INTO t select * from t");
ParserError("CREATE VIEW Foo.Bar (x) AS CREATE TABLE Wrong (i int)");
ParserError("CREATE VIEW Foo.Bar (x) AS ALTER TABLE Foo COLUMNS (i int, s
string)");
ParserError("CREATE VIEW Foo.Bar (x) AS CREATE VIEW Foo.Bar AS SELECT 1");
@@ -2541,6 +2611,7 @@ public class ParserTest {
ParserError("ALTER VIEW Foo.Bar AS");
// Invalid view definitions. A view definition must be a query statement.
ParserError("ALTER VIEW Foo.Bar AS INSERT INTO t select * from t");
+ ParserError("ALTER VIEW Foo.Bar AS UPSERT INTO t select * from t");
ParserError("ALTER VIEW Foo.Bar AS CREATE TABLE Wrong (i int)");
ParserError("ALTER VIEW Foo.Bar AS ALTER TABLE Foo COLUMNS (i int, s
string)");
ParserError("ALTER VIEW Foo.Bar AS CREATE VIEW Foo.Bar AS SELECT 1, 2, 3");
@@ -2571,8 +2642,9 @@ public class ParserTest {
ParserError("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUET AS
WITH");
ParserError("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUET AS");
- // INSERT statements are not allowed
+ // INSERT/UPSERT statements are not allowed
ParserError("CREATE TABLE Foo AS INSERT INTO Foo SELECT 1");
+ ParserError("CREATE TABLE Foo AS UPSERT INTO Foo SELECT 1");
// Column and partition definitions not allowed
ParserError("CREATE TABLE Foo(i int) AS SELECT 1");
@@ -2840,7 +2912,7 @@ public class ParserTest {
"Encountered: IDENTIFIER\n" +
"Expected: ALTER, COMPUTE, CREATE, DELETE, DESCRIBE, DROP, EXPLAIN,
GRANT, " +
"INSERT, INVALIDATE, LOAD, REFRESH, REVOKE, SELECT, SET, SHOW,
TRUNCATE, " +
- "UPDATE, USE, VALUES, WITH\n");
+ "UPDATE, UPSERT, USE, VALUES, WITH\n");
// missing select list
ParserError("select from t",
@@ -2981,6 +3053,7 @@ public class ParserTest {
public void TestExplain() {
ParsesOk("explain select a from tbl");
ParsesOk("explain insert into tbl select a, b, c, d from tbl");
+ ParsesOk("explain upsert into tbl select a, b, c, d from tbl");
ParserError("explain");
// cannot EXPLAIN an explain stmt
ParserError("explain explain select a from tbl");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 371e811..b5cf446 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -655,6 +655,8 @@ public class ToSqlTest extends FrontendTestBase {
"values(1, true, 1, 1, 10, 10, 10.0, 10.0, 'a', 'a', cast (0 as
timestamp))",
"INSERT INTO TABLE functional.alltypessmall PARTITION (year=2009,
month=4) " +
"VALUES(1, TRUE, 1, 1, 10, 10, 10.0, 10.0, 'a', 'a', CAST(0 AS
TIMESTAMP))");
+ testToSql("upsert into table functional_kudu.testtbl values(1, 'a', 1)",
+ "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)");
}
/**
@@ -903,6 +905,11 @@ public class ToSqlTest extends FrontendTestBase {
"WITH t1 AS (SELECT * FROM functional.alltypes) " +
"INSERT INTO TABLE functional.alltypes PARTITION (year, month) " +
"SELECT * FROM t1");
+ // WITH clause in upsert stmt.
+ testToSql("with t1 as (select * from functional.alltypes) upsert into " +
+ "functional_kudu.testtbl select bigint_col, string_col, int_col from
t1",
+ "WITH t1 AS (SELECT * FROM functional.alltypes) UPSERT INTO TABLE " +
+ "functional_kudu.testtbl SELECT bigint_col, string_col, int_col FROM
t1");
// Test joins in WITH-clause view.
testToSql("with t as (select a.* from functional.alltypes a, " +
"functional.alltypes b where a.id = b.id) select * from t",
@@ -1016,6 +1023,29 @@ public class ToSqlTest extends FrontendTestBase {
}
@Test
+ public void upsertTest() {
+ // VALUES clause
+ testToSql("upsert into functional_kudu.testtbl values (1, 'a', 1)",
+ "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)");
+
+ // SELECT clause
+ testToSql("upsert into functional_kudu.testtbl select bigint_col,
string_col, " +
+ "int_col from functional.alltypes", "UPSERT INTO TABLE
functional_kudu.testtbl " +
+ "SELECT bigint_col, string_col, int_col FROM functional.alltypes");
+
+ // WITH clause
+ testToSql("with x as (select bigint_col, string_col, int_col from " +
+ "functional.alltypes) upsert into table functional_kudu.testtbl select
* from x",
+ "WITH x AS (SELECT bigint_col, string_col, int_col FROM
functional.alltypes) " +
+ "UPSERT INTO TABLE functional_kudu.testtbl SELECT * FROM x");
+
+ // Permutation
+ testToSql("upsert into table functional_kudu.testtbl (zip, id, name)
values " +
+ "(1, 1, 'a')", "UPSERT INTO TABLE functional_kudu.testtbl(zip, id,
name) " +
+ "VALUES(1, 1, 'a')");
+ }
+
+ @Test
public void testAnalyticExprs() {
testToSql(
"select sum(int_col) over (partition by id order by tinyint_col "
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 6250969..91035d1 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -254,6 +254,12 @@ public class PlannerTest extends PlannerTestBase {
}
@Test
+ public void testKuduUpsert() {
+ Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
+ runPlannerTestFile("kudu-upsert");
+ }
+
+ @Test
public void testKuduUpdate() {
Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
runPlannerTestFile("kudu-update");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
new file mode 100644
index 0000000..b106b02
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -0,0 +1,92 @@
+# simple upsert with select
+upsert into table functional_kudu.testtbl
+select bigint_col, string_col, int_col from functional.alltypes
+where year=2009 and month=05
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=1/24 files=1 size=20.36KB
+====
+# simple upsert with values clause
+upsert into table functional_kudu.testtbl
+values (1, 'a', 1), (2, 'b', 2)
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+00:UNION
+ constant-operands=2
+====
+# upsert with 'with' clause and limit
+with x as (select string_col, count(*) from functional.alltypes group by
string_col)
+upsert into table functional_kudu.testtbl
+select a.bigint_col, a.string_col, a.int_col from functional.alltypes a, x
+where x.string_col = a.string_col
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+03:HASH JOIN [INNER JOIN]
+| hash predicates: a.string_col = string_col
+| runtime filters: RF000 <- string_col
+|
+|--02:AGGREGATE [FINALIZE]
+| | group by: string_col
+| |
+| 01:SCAN HDFS [functional.alltypes]
+| partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+ partitions=24/24 files=24 size=478.45KB
+ runtime filters: RF000 -> a.string_col
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+| hash predicates: a.string_col = string_col
+| runtime filters: RF000 <- string_col
+|
+|--06:EXCHANGE [BROADCAST]
+| |
+| 05:AGGREGATE [FINALIZE]
+| | group by: string_col
+| |
+| 04:EXCHANGE [HASH(string_col)]
+| |
+| 02:AGGREGATE [STREAMING]
+| | group by: string_col
+| |
+| 01:SCAN HDFS [functional.alltypes]
+| partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+ partitions=24/24 files=24 size=478.45KB
+ runtime filters: RF000 -> a.string_col
+====
+# upsert with inline view
+upsert into functional_kudu.testtbl
+select v.id, v.string_col, v.cnt from (
+ select id, string_col, cast(count(*) as int) cnt from
+ functional.alltypes
+ group by 1, 2) v
+where cnt < 10
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+01:AGGREGATE [FINALIZE]
+| output: count(*)
+| group by: id, string_col
+| having: CAST(count(*) AS INT) < 10
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+====
+upsert into functional_kudu.testtbl /*+ clustered */
+select * from functional_kudu.testtbl
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+01:SORT
+| order by: id DESC NULLS LAST
+|
+00:SCAN KUDU [functional_kudu.testtbl]
+====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/832fb537/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index 2782908..949a25c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -261,6 +261,75 @@ delete ignore a from tdata a, tdata b where a.id = 666
row_regex: .*NumModifiedRows: 1.*
====
---- QUERY
+select * from tdata
+---- RESULTS
+40,'he',0,43,'e',false
+120,'she',0,99,'f',true
+320,'',2,932,'',false
+1,'unknown',1,43,'aaaaaaaaaaaaaaaaaaaa',false
+2,'david',1,43,'b',false
+3,'todd',1,43,'c',true
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata values (40, 'they', 1, 43, cast('e' as VARCHAR(20)),
false),
+(1, NULL, 1, 0, cast('a' as VARCHAR(20)), true)
+---- RESULTS
+====
+---- QUERY
+select * from tdata
+---- RESULTS
+40,'they',1,43,'e',false
+120,'she',0,99,'f',true
+320,'',2,932,'',false
+1,'NULL',1,0,'a',true
+2,'david',1,43,'b',false
+3,'todd',1,43,'c',true
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata (id, valf) values (2, NULL), (120, 20), (0, 0)
+---- RESULTS
+====
+---- QUERY
+select * from tdata
+---- RESULTS
+40,'they',1,43,'e',false
+120,'she',20,99,'f',true
+320,'',2,932,'',false
+1,'NULL',1,0,'a',true
+2,'david',NULL,43,'b',false
+3,'todd',1,43,'c',true
+0,'NULL',0,NULL,'NULL',NULL
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata (valb, name, id)
+select false as valb, 'he' as name, id from tdata where id < 2
+---- RESULTS
+====
+---- QUERY
+select * from tdata
+---- RESULTS
+40,'they',1,43,'e',false
+120,'she',20,99,'f',true
+320,'',2,932,'',false
+1,'he',1,0,'a',false
+2,'david',NULL,43,'b',false
+3,'todd',1,43,'c',true
+0,'he',0,NULL,'NULL',false
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
+====
+---- QUERY
+upsert into table tdata (id, name) values (null, '')
+---- CATCH
+Could not add Kudu WriteOp.: Invalid argument: column not nullable: id[int32
NOT NULL]
+====
+---- QUERY
# IMPALA-3454: A delete that requires a rewrite may not get the Kudu column
order correct
# if the Kudu columns are of different types.
create table impala_3454 (key_1 tinyint, key_2 bigint, PRIMARY KEY (key_1,
key_2))