IMPALA-3710: Kudu DML should ignore conflicts by default Removes the non-standard IGNORE syntax that was allowed for DML into Kudu tables to indicate that certain errors should be ignored, i.e. not fail the query and continue. However, because there is no way to 'roll back' mutations that occurred before an error occurs, tables are left in an inconsistent state and it's difficult to know what rows were successfully modified vs which rows were not. Instead, this change makes it so that we always 'ignore' these conflicts, i.e. a 'best effort'. In the future, when Kudu will provide the mechanisms Impala needs to provide a notion of isolation levels, then Impala will be able to provide options for more traditional semantics.
After this change, the following errors are ignored: * INSERT where the PK already exists * UPDATE/DELETE where the PK doesn't exist Another follow-up patch will change other violations to be handled in this way as well, e.g. nulls inserted in non-nullable cols. Reporting: The number of rows inserted is reported to the coordinator, which makes the aggregate available to the shell and via the profile. TODO: Return rows modified for INSERT via HS2 (IMPALA-1789). TODO: Return rows modified for other CRUD (beeswax+hs2) (IMPALA-3713). TODO: Return error counts for specific warnings (IMPALA-4416). Testing: Updated tests. Ran all functional tests. More tests will be needed when other conflicts are handled in the same way. Change-Id: I83b5beaa982d006da4997a2af061ef7c22cad3f1 Reviewed-on: http://gerrit.cloudera.org:8080/4911 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/08d89a5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/08d89a5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/08d89a5c Branch: refs/heads/hadoop-next Commit: 08d89a5cc3d2135896a7d4518dac7d22e5e66ddf Parents: ce4c5f6 Author: Matthew Jacobs <[email protected]> Authored: Tue Nov 1 17:52:21 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Tue Nov 8 20:34:00 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-table-sink.cc | 11 ++--- be/src/exec/kudu-table-sink.h | 13 ++--- common/thrift/DataSinks.thrift | 3 -- fe/src/main/cup/sql-parser.cup | 51 ++++++++------------ .../analysis/CreateTableAsSelectStmt.java | 2 +- .../org/apache/impala/analysis/DeleteStmt.java | 14 +++--- .../org/apache/impala/analysis/InsertStmt.java | 21 +++----- .../org/apache/impala/analysis/ModifyStmt.java | 7 +-- .../org/apache/impala/analysis/UpdateStmt.java | 21 ++++---- .../apache/impala/planner/KuduTableSink.java | 19 ++------ .../org/apache/impala/planner/TableSink.java | 11 ++--- .../impala/analysis/AnalyzeModifyStmtsTest.java | 2 - .../org/apache/impala/analysis/ParserTest.java | 6 --- .../queries/PlannerTest/kudu-delete.test | 18 ------- .../queries/PlannerTest/kudu-update.test | 14 ------ .../queries/PlannerTest/kudu.test | 10 ---- .../queries/QueryTest/kudu_crud.test | 43 ++--------------- 17 files changed, 66 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 14b1889..a58baab 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -292,13 +292,10 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) { // them accordingly. for (int i = 0; i < errors.size(); ++i) { kudu::Status e = errors[i]->status(); - // If the sink has the option "ignore_not_found_or_duplicate" set, duplicate key or - // key already present errors from Kudu in INSERT, UPDATE, or DELETE operations will - // be ignored. - if (!kudu_table_sink_.ignore_not_found_or_duplicate || - ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) || - (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) || - (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent()))) { + // 'Duplicate key' or 'key already present' errors from Kudu do not fail the query. + if ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) || + (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) || + (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent())) { if (status.ok()) { status = Status(strings::Substitute( "Kudu error(s) reported, first error: $0", e.ToString())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/be/src/exec/kudu-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index c37b15d..adf264e 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -45,12 +45,13 @@ namespace impala { /// /// Kudu doesn't have transactions yet, so some rows may fail to write while others are /// successful. The Kudu client reports errors, some of which may be considered to be -/// expected: rows that fail to be written/updated/deleted due to a key conflict while -/// the IGNORE option is specified, and these will not result in the sink returning an -/// error. These errors when IGNORE is not specified, or any other kind of error -/// reported by Kudu result in the sink returning an error status. The first non-ignored -/// error is returned in the sink's Status. All reported errors (ignored or not) will be -/// logged via the RuntimeState. +/// expected: rows that fail to be written/updated/deleted due to a key conflict will not +/// result in the sink returning an error. Any other kind of error reported by Kudu +/// results in the sink returning an error status. The first non-ignored error is +/// returned in the sink's Status. All reported errors (ignored or not) will be logged +/// via the RuntimeState. +/// TODO: Handle other data/constraint-violation errors as ignored, e.g. null in +/// non-nullable col class KuduTableSink : public DataSink { public: KuduTableSink(const RowDescriptor& row_desc, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/common/thrift/DataSinks.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 26f74cf..2a57304 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -74,9 +74,6 @@ struct TKuduTableSink { // sink and holds the index of the corresponsding column in the Kudu schema, // e.g. 'exprs[i]' references 'kudu_table.column(referenced_cols[i])' 1: optional list<i32> referenced_columns; - - // Defines if duplicate or not found keys should be ignored - 2: optional bool ignore_not_found_or_duplicate; } // Sink to create the build side of a JoinNode. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/cup/sql-parser.cup ---------------------------------------------------------------------- diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index c0865c4..57d5a24 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -479,7 +479,6 @@ nonterminal CreateUdaStmt create_uda_stmt; nonterminal ShowFunctionsStmt show_functions_stmt; nonterminal DropFunctionStmt drop_function_stmt; nonterminal TFunctionCategory opt_function_category; -nonterminal Boolean opt_ignore; precedence left KW_OR; precedence left KW_AND; @@ -677,47 +676,44 @@ explain_stmt ::= // tbl(col1,...) etc) and the PARTITION clause. If the column permutation is present, the // query statement clause is optional as well. insert_stmt ::= - opt_with_clause:w KW_INSERT KW_OVERWRITE opt_ignore:ignore opt_kw_table table_name:table + opt_with_clause:w KW_INSERT KW_OVERWRITE opt_kw_table table_name:table LPAREN opt_ident_list:col_perm RPAREN partition_clause:list opt_plan_hints:hints opt_query_stmt:query {: - RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, col_perm, - ignore); + RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, col_perm); :} | opt_with_clause:w KW_INSERT KW_OVERWRITE - opt_ignore:ignore opt_kw_table table_name:table + opt_kw_table table_name:table partition_clause:list opt_plan_hints:hints query_stmt:query {: - RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, null, ignore); + RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, null); :} - | opt_with_clause:w KW_INSERT opt_ignore:ignore KW_INTO - opt_kw_table table_name:table LPAREN opt_ident_list:col_perm RPAREN + | opt_with_clause:w KW_INSERT KW_INTO opt_kw_table table_name:table + LPAREN opt_ident_list:col_perm RPAREN partition_clause:list opt_plan_hints:hints opt_query_stmt:query {: - RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, col_perm, - ignore); + RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, col_perm); :} - | opt_with_clause:w KW_INSERT - opt_ignore:ignore KW_INTO opt_kw_table table_name:table partition_clause:list - opt_plan_hints:hints query_stmt:query + | opt_with_clause:w KW_INSERT KW_INTO opt_kw_table table_name:table + partition_clause:list opt_plan_hints:hints query_stmt:query {: - RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, null, ignore); + RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, null); :} ; // Update statements have an optional WHERE and optional FROM clause. update_stmt ::= - KW_UPDATE opt_ignore:ignore dotted_path:target_table KW_SET update_set_expr_list:values + KW_UPDATE dotted_path:target_table KW_SET update_set_expr_list:values where_clause:where_predicate {: FromClause from_clause = new FromClause( Lists.newArrayList(new TableRef(target_table, null))); - RESULT = new UpdateStmt(target_table, from_clause, values, where_predicate, ignore); + RESULT = new UpdateStmt(target_table, from_clause, values, where_predicate); :} - | KW_UPDATE opt_ignore:ignore dotted_path:target_table + | KW_UPDATE dotted_path:target_table KW_SET update_set_expr_list:values from_clause:tables where_clause:where_predicate - {: RESULT = new UpdateStmt(target_table, tables, values, where_predicate, ignore); :} + {: RESULT = new UpdateStmt(target_table, tables, values, where_predicate); :} ; update_set_expr_list ::= @@ -750,21 +746,21 @@ upsert_stmt ::= // keyword followed by a table alias or reference and a full FROM clause. In all cases // a WHERE clause may be present. delete_stmt ::= - KW_DELETE opt_ignore:ignore dotted_path:target_table where_clause:where + KW_DELETE dotted_path:target_table where_clause:where {: FromClause from_clause = new FromClause( Lists.newArrayList(new TableRef(target_table, null))); - RESULT = new DeleteStmt(target_table, from_clause, where, ignore); + RESULT = new DeleteStmt(target_table, from_clause, where); :} - | KW_DELETE opt_ignore:ignore KW_FROM dotted_path:target_table where_clause:where + | KW_DELETE KW_FROM dotted_path:target_table where_clause:where {: FromClause from_clause = new FromClause( Lists.newArrayList(new TableRef(target_table, null))); - RESULT = new DeleteStmt(target_table, from_clause, where, ignore); + RESULT = new DeleteStmt(target_table, from_clause, where); :} - | KW_DELETE opt_ignore:ignore dotted_path:target_table from_clause:from + | KW_DELETE dotted_path:target_table from_clause:from where_clause:where - {: RESULT = new DeleteStmt(target_table, from, where, ignore); :} + {: RESULT = new DeleteStmt(target_table, from, where); :} ; opt_query_stmt ::= @@ -786,13 +782,6 @@ opt_kw_table ::= | /* empty */ ; -opt_ignore ::= - KW_IGNORE - {: RESULT = true; :} - | /* empty */ - {: RESULT = false; :} - ; - show_roles_stmt ::= KW_SHOW KW_ROLES {: RESULT = new ShowRolesStmt(false, null); :} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index 2eb399f..1e53d1e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -81,7 +81,7 @@ public class CreateTableAsSelectStmt extends StatementBase { } } insertStmt_ = InsertStmt.createInsert( - null, createStmt.getTblName(), false, pkvs, null, queryStmt, null, false); + null, createStmt.getTblName(), false, pkvs, null, queryStmt, null); } public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java index 62606b8..9d84baa 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java @@ -21,12 +21,11 @@ import java.util.List; import org.apache.impala.common.Pair; import org.apache.impala.planner.DataSink; -import org.apache.impala.planner.KuduTableSink; import org.apache.impala.planner.TableSink; + import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.hadoop.hbase.client.Delete; /** * Representation of a DELETE statement. @@ -42,22 +41,22 @@ import org.apache.hadoop.hbase.client.Delete; public class DeleteStmt extends ModifyStmt { public DeleteStmt(List<String> targetTablePath, FromClause tableRefs, - Expr wherePredicate, boolean ignoreNotFound) { + Expr wherePredicate) { super(targetTablePath, tableRefs, Lists.<Pair<SlotRef, Expr>>newArrayList(), - wherePredicate, ignoreNotFound); + wherePredicate); } public DeleteStmt(DeleteStmt other) { super(other.targetTablePath_, other.fromClause_.clone(), - Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_.clone(), - other.ignoreNotFound_); + Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_.clone()); } + @Override public DataSink createDataSink() { // analyze() must have been called before. Preconditions.checkState(table_ != null); TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE, - ImmutableList.<Expr>of(), referencedColumns_, false, ignoreNotFound_); + ImmutableList.<Expr>of(), referencedColumns_, false); Preconditions.checkState(!referencedColumns_.isEmpty()); return tableSink; } @@ -71,7 +70,6 @@ public class DeleteStmt extends ModifyStmt { public String toSql() { StringBuilder b = new StringBuilder(); b.append("DELETE"); - if (ignoreNotFound_) b.append(" IGNORE"); if (fromClause_.size() > 1 || targetTableRef_.hasExplicitAlias()) { b.append(" "); if (targetTableRef_.hasExplicitAlias()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index da360b1..55005f9 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -146,32 +146,26 @@ public class InsertStmt extends StatementBase { // END: Members that need to be reset() ///////////////////////////////////////// - // For tables with primary keys, indicates if duplicate key errors are ignored. - private final boolean ignoreDuplicates_; - // True iff this is an UPSERT operation. Only supported for Kudu tables. private final boolean isUpsert_; public static InsertStmt createInsert(WithClause withClause, TableName targetTable, boolean overwrite, List<PartitionKeyValue> partitionKeyValues, - List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation, - boolean ignoreDuplicates) { + List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) { return new InsertStmt(withClause, targetTable, overwrite, partitionKeyValues, - planHints, queryStmt, columnPermutation, ignoreDuplicates, false); + planHints, queryStmt, columnPermutation, false); } public static InsertStmt createUpsert(WithClause withClause, TableName targetTable, List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) { return new InsertStmt(withClause, targetTable, false, null, planHints, queryStmt, - columnPermutation, false, true); + columnPermutation, true); } protected InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite, List<PartitionKeyValue> partitionKeyValues, List<String> planHints, - QueryStmt queryStmt, List<String> columnPermutation, boolean ignoreDuplicates, - boolean isUpsert) { - Preconditions.checkState(!isUpsert || (!overwrite && !ignoreDuplicates && - partitionKeyValues == null)); + QueryStmt queryStmt, List<String> columnPermutation, boolean isUpsert) { + Preconditions.checkState(!isUpsert || (!overwrite && partitionKeyValues == null)); withClause_ = withClause; targetTableName_ = targetTable; originalTableName_ = targetTableName_; @@ -182,7 +176,6 @@ public class InsertStmt extends StatementBase { needsGeneratedQueryStatement_ = (queryStmt == null); columnPermutation_ = columnPermutation; table_ = null; - ignoreDuplicates_ = ignoreDuplicates; isUpsert_ = isUpsert; } @@ -201,7 +194,6 @@ public class InsertStmt extends StatementBase { needsGeneratedQueryStatement_ = other.needsGeneratedQueryStatement_; columnPermutation_ = other.columnPermutation_; table_ = other.table_; - ignoreDuplicates_ = other.ignoreDuplicates_; isUpsert_ = other.isUpsert_; } @@ -789,7 +781,7 @@ public class InsertStmt extends StatementBase { Preconditions.checkState(table_ != null); Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty()); return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, - partitionKeyExprs_, mentionedUpsertColumns_, overwrite_, ignoreDuplicates_); + partitionKeyExprs_, mentionedUpsertColumns_, overwrite_); } /** @@ -813,7 +805,6 @@ public class InsertStmt extends StatementBase { if (overwrite_) { strBuilder.append("OVERWRITE "); } else { - if (ignoreDuplicates_) strBuilder.append("IGNORE "); strBuilder.append("INTO "); } strBuilder.append("TABLE " + originalTableName_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java index 00f99a5..0107079 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java @@ -92,17 +92,12 @@ public abstract class ModifyStmt extends StatementBase { // position in the target table. Set in createSourceStmt() during analysis. protected ArrayList<Integer> referencedColumns_; - // On tables with a primary key, ignore key not found errors. - protected final boolean ignoreNotFound_; - public ModifyStmt(List<String> targetTablePath, FromClause fromClause, - List<Pair<SlotRef, Expr>> assignmentExprs, - Expr wherePredicate, boolean ignoreNotFound) { + List<Pair<SlotRef, Expr>> assignmentExprs, Expr wherePredicate) { targetTablePath_ = Preconditions.checkNotNull(targetTablePath); fromClause_ = Preconditions.checkNotNull(fromClause); assignments_ = Preconditions.checkNotNull(assignmentExprs); wherePredicate_ = wherePredicate; - ignoreNotFound_ = ignoreNotFound; } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java index 8b0d96d..9a1bc9e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java @@ -17,18 +17,18 @@ package org.apache.impala.analysis; +import static java.lang.String.format; + import java.util.List; import org.apache.impala.common.Pair; import org.apache.impala.planner.DataSink; -import org.apache.impala.planner.KuduTableSink; import org.apache.impala.planner.TableSink; + import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import static java.lang.String.format; - /** * Representation of an Update statement. * @@ -47,26 +47,25 @@ import static java.lang.String.format; * Currently, only Kudu tables can be updated. */ public class UpdateStmt extends ModifyStmt { - public UpdateStmt(List<String> targetTablePath, FromClause tableRefs, - List<Pair<SlotRef, Expr>> assignmentExprs, Expr wherePredicate, - boolean ignoreNotFound) { - super(targetTablePath, tableRefs, assignmentExprs, wherePredicate, ignoreNotFound); + public UpdateStmt(List<String> targetTablePath, FromClause tableRefs, + List<Pair<SlotRef, Expr>> assignmentExprs, Expr wherePredicate) { + super(targetTablePath, tableRefs, assignmentExprs, wherePredicate); } public UpdateStmt(UpdateStmt other) { super(other.targetTablePath_, other.fromClause_.clone(), - Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_, - other.ignoreNotFound_); + Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_); } /** * Return an instance of a KuduTableSink specialized as an Update operation. */ + @Override public DataSink createDataSink() { // analyze() must have been called before. Preconditions.checkState(table_ != null); DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE, - ImmutableList.<Expr>of(), referencedColumns_, false, ignoreNotFound_); + ImmutableList.<Expr>of(), referencedColumns_, false); Preconditions.checkState(!referencedColumns_.isEmpty()); return dataSink; } @@ -81,8 +80,6 @@ public class UpdateStmt extends ModifyStmt { StringBuilder b = new StringBuilder(); b.append("UPDATE "); - if (ignoreNotFound_) b.append("IGNORE "); - if (fromClause_ == null) { b.append(targetTableRef_.toSql()); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java index 0a99af4..35b9022 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java @@ -30,6 +30,7 @@ import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TKuduTableSink; import org.apache.impala.thrift.TTableSink; import org.apache.impala.thrift.TTableSinkType; + import com.google.common.collect.Lists; /** @@ -40,16 +41,13 @@ public class KuduTableSink extends TableSink { // Optional list of referenced Kudu table column indices. The position of a result // expression i matches a column index into the Kudu schema at targetColdIdxs[i]. - private ArrayList<Integer> targetColIdxs_; - - private final boolean ignoreNotFoundOrDuplicate_; + private final ArrayList<Integer> targetColIdxs_; public KuduTableSink(Table targetTable, Op sinkOp, - List<Integer> referencedColumns, boolean ignoreNotFoundOrDuplicate) { + List<Integer> referencedColumns) { super(targetTable, sinkOp); targetColIdxs_ = referencedColumns != null ? Lists.newArrayList(referencedColumns) : null; - ignoreNotFoundOrDuplicate_ = ignoreNotFoundOrDuplicate; } @Override @@ -58,16 +56,6 @@ public class KuduTableSink extends TableSink { StringBuilder output = new StringBuilder(); output.append(prefix + sinkOp_.toExplainString()); output.append(" KUDU [" + targetTable_.getFullName() + "]\n"); - if (sinkOp_ != Op.UPSERT) { - output.append(detailPrefix); - if (sinkOp_ == Op.INSERT) { - output.append("check unique keys: "); - } else { - output.append("check keys exist: "); - } - output.append(ignoreNotFoundOrDuplicate_); - output.append("\n"); - } if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes())); output.append(PrintUtils.printMemCost(" ", perHostMemCost_)); @@ -83,7 +71,6 @@ public class KuduTableSink extends TableSink { TTableSinkType.KUDU, sinkOp_.toThrift()); TKuduTableSink tKuduSink = new TKuduTableSink(); tKuduSink.setReferenced_columns(targetColIdxs_); - tKuduSink.setIgnore_not_found_or_duplicate(ignoreNotFoundOrDuplicate_); tTableSink.setKudu_table_sink(tKuduSink); result.table_sink = tTableSink; return result; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/main/java/org/apache/impala/planner/TableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index 102b280..742e6c9 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -17,15 +17,16 @@ package org.apache.impala.planner; +import java.util.List; + import org.apache.impala.analysis.Expr; import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; import org.apache.impala.thrift.TSinkAction; -import com.google.common.base.Preconditions; -import java.util.List; +import com.google.common.base.Preconditions; /** * A DataSink that writes into a table. @@ -89,7 +90,7 @@ public abstract class TableSink extends DataSink { */ public static TableSink create(Table table, Op sinkAction, List<Expr> partitionKeyExprs, List<Integer> referencedColumns, - boolean overwrite, boolean ignoreDuplicates) { + boolean overwrite) { if (table instanceof HdfsTable) { // Hdfs only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); @@ -112,9 +113,7 @@ public abstract class TableSink extends DataSink { Preconditions.checkState(overwrite == false); // Partition clauses don't make sense for Kudu inserts. Preconditions.checkState(partitionKeyExprs.isEmpty()); - // UPSERT is incompatible with ignoreDuplicates. - Preconditions.checkState(sinkAction != Op.UPSERT || !ignoreDuplicates); - return new KuduTableSink(table, sinkAction, referencedColumns, ignoreDuplicates); + return new KuduTableSink(table, sinkAction, referencedColumns); } else { throw new UnsupportedOperationException( "Cannot create data sink into table of type: " + table.getClass().getName()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java index 939d499..152add6 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java @@ -76,7 +76,6 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { ".testtbl a on a.id = b.id where a.id = 10"); AnalyzesOk("delete from functional_kudu.testtbl"); - AnalyzesOk("delete ignore from functional_kudu.testtbl"); AnalyzesOk("delete functional_kudu.testtbl from functional_kudu.testtbl"); AnalyzesOk("delete a from functional_kudu.testtbl a"); AnalyzesOk("delete a from functional_kudu.testtbl a join functional.testtbl b " + @@ -87,7 +86,6 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { public void TestUpdate() { TestUtils.assumeKuduIsSupported(); AnalyzesOk("update functional_kudu.dimtbl set name = 'Oskar'"); - AnalyzesOk("update ignore functional_kudu.dimtbl set name = 'Oskar'"); // Correct default database resolution AnalyzesOk("update dimtbl set name = 'Oskar'", createAnalyzer("functional_kudu")); // Correct table alias resolution http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/fe/src/test/java/org/apache/impala/analysis/ParserTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 98863ce..0affd4e 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -1615,8 +1615,6 @@ public class ParserTest extends FrontendTestBase { "select a from src where b > 5"); ParserError("insert into table t partition [shuffle] (pk1=10 pk2=20) " + "select a from src where b > 5"); - - ParsesOk("insert ignore into table t values (1,2,3)"); } @Test @@ -1666,8 +1664,6 @@ public class ParserTest extends FrontendTestBase { ParsesOk("update t set x = (3 + length(\"hallo\")) where a < 'adasas'"); ParsesOk("update t set x = 3"); ParsesOk("update t set x=3, x=4 from a.b t where b = 10"); - ParsesOk("update ignore t set x = 3"); - ParsesOk("update ignore t set x=3, x=4 from a.b t where b = 10"); ParserError("update t"); ParserError("update t set x < 3"); ParserError("update t set x"); @@ -1687,11 +1683,9 @@ public class ParserTest extends FrontendTestBase { @Test public void TestDelete() { ParsesOk("delete from t"); - ParsesOk("delete ignore from t"); ParsesOk("delete a from t a"); ParsesOk("delete a from t a join b on a.id = b.id where true"); ParsesOk("delete a from t a join b where true"); - ParsesOk("delete ignore a from t a join b where true"); ParsesOk("delete t from t"); ParsesOk("delete t from t where a < b"); ParsesOk("delete a from t a where a < b"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test index 50c5b12..3a4b97a 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test @@ -1,37 +1,21 @@ delete from functional_kudu.testtbl ---- PLAN DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] ---- DISTRIBUTEDPLAN DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: false -| -00:SCAN KUDU [functional_kudu.testtbl] -==== -delete ignore from functional_kudu.testtbl ----- PLAN -DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: true -| -00:SCAN KUDU [functional_kudu.testtbl] ----- DISTRIBUTEDPLAN -DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: true | 00:SCAN KUDU [functional_kudu.testtbl] ==== delete from functional_kudu.testtbl where name = 'hallo' ---- PLAN DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: name = 'hallo' ---- DISTRIBUTEDPLAN DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: name = 'hallo' @@ -40,7 +24,6 @@ delete a from functional_kudu.testtbl a, functional.alltypes b where a.id = b.id and a.id in (select id from functional.alltypes) ---- PLAN DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: false | 04:HASH JOIN [RIGHT SEMI JOIN] | hash predicates: id = a.id @@ -61,7 +44,6 @@ DELETE FROM KUDU [functional_kudu.testtbl] runtime filters: RF000 -> id ---- DISTRIBUTEDPLAN DELETE FROM KUDU [functional_kudu.testtbl] -| check keys exist: false | 04:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED] | hash predicates: id = a.id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test index a26ec40..4681b91 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test @@ -1,13 +1,11 @@ update functional_kudu.testtbl set name = 'peter' where zip > 94549 ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: zip > 94549 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: zip > 94549 @@ -16,13 +14,11 @@ UPDATE KUDU [functional_kudu.testtbl] update functional_kudu.testtbl set name = 'peter' where zip > 94549 and id = 5 ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: id = 5, zip > 94549 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: id = 5, zip > 94549 @@ -31,13 +27,11 @@ UPDATE KUDU [functional_kudu.testtbl] update functional_kudu.testtbl set zip = 94546 where zip > 94549 ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: zip > 94549 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: zip > 94549 @@ -48,7 +42,6 @@ from functional_kudu.testtbl a join functional.testtbl b on a.id = b.id where a.id = 10 ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 02:HASH JOIN [INNER JOIN] | hash predicates: a.id = b.id @@ -61,7 +54,6 @@ UPDATE KUDU [functional_kudu.testtbl] kudu predicates: a.id = 10 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 02:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: a.id = b.id @@ -80,7 +72,6 @@ set a.name = 'values' from functional_kudu.testtbl a join (values(1 as ids, 2, 3) ) b on a.id = b.ids ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 02:HASH JOIN [INNER JOIN] | hash predicates: ids = a.id @@ -91,7 +82,6 @@ UPDATE KUDU [functional_kudu.testtbl] constant-operands=1 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 02:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: ids = a.id @@ -109,7 +99,6 @@ from functional_kudu.testtbl a where a.zip in (select zip from functional.testtbl limit 10) ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 02:HASH JOIN [LEFT SEMI JOIN] | hash predicates: a.zip = zip @@ -121,7 +110,6 @@ UPDATE KUDU [functional_kudu.testtbl] 00:SCAN KUDU [functional_kudu.testtbl a] ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 02:HASH JOIN [LEFT SEMI JOIN, BROADCAST] | hash predicates: a.zip = zip @@ -140,12 +128,10 @@ UPDATE KUDU [functional_kudu.testtbl] update functional_kudu.testtbl set zip = 94546 where false ---- PLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:EMPTYSET ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] -| check keys exist: false | 00:EMPTYSET ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index a656341..fbb7f7f 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -44,13 +44,11 @@ PLAN-ROOT SINK insert into functional_kudu.testtbl(id) values (10) ---- PLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 00:UNION constant-operands=1 ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 00:UNION constant-operands=1 @@ -58,12 +56,10 @@ INSERT INTO KUDU [functional_kudu.testtbl] insert into functional_kudu.testtbl(id) select int_col from functional_kudu.tinyinttable ---- PLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 00:SCAN KUDU [functional_kudu.tinyinttable] ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 00:SCAN KUDU [functional_kudu.tinyinttable] ==== @@ -72,7 +68,6 @@ select count(distinct id), name from functional_kudu.dimtbl group by name ---- PLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 02:AGGREGATE [FINALIZE] | output: count(id) @@ -84,7 +79,6 @@ INSERT INTO KUDU [functional_kudu.testtbl] 00:SCAN KUDU [functional_kudu.dimtbl] ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 02:AGGREGATE [FINALIZE] | output: count(id) @@ -229,7 +223,6 @@ insert into table functional_kudu.alltypes /*+ clustered */ select * from functional_kudu.alltypes ---- PLAN INSERT INTO KUDU [functional_kudu.alltypes] -| check unique keys: false | 01:SORT | order by: id DESC NULLS LAST @@ -237,7 +230,6 @@ INSERT INTO KUDU [functional_kudu.alltypes] 00:SCAN KUDU [functional_kudu.alltypes] ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.alltypes] -| check unique keys: false | 01:SORT | order by: id DESC NULLS LAST @@ -253,7 +245,6 @@ from functional_kudu.testtbl group by id, name ) as sub; ---- PLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 02:SORT | order by: id DESC NULLS LAST @@ -265,7 +256,6 @@ INSERT INTO KUDU [functional_kudu.testtbl] 00:SCAN KUDU [functional_kudu.testtbl] ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.testtbl] -| check unique keys: false | 04:SORT | order by: id DESC NULLS LAST http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08d89a5c/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index 949a25c..761db13 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -213,13 +213,7 @@ row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY insert into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) ----- CATCH -Kudu error(s) reported, first error: Already present -==== ----- QUERY -insert ignore into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) +(666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- RESULTS : 0 ---- RUNTIME_PROFILE @@ -234,28 +228,14 @@ row_regex: .*NumModifiedRows: 7.* ==== ---- QUERY -- Does not exercise any error path in the sink because updating the same record multiple --- times is valid. Makes sure IGNORE works. -update ignore a set a.name='Satan' from tdata a, tdata b where a.id = 666 +-- times is valid. +update a set a.name='Satan' from tdata a, tdata b where a.id = 666 ---- RESULTS ---- RUNTIME_PROFILE row_regex: .*NumModifiedRows: 7.* ==== ---- QUERY --- Using a cross join to generate the same delete twice. After the first delete succeeded, --- trying to execute the second delete will fail because the record does not exist. delete a from tdata a, tdata b where a.id = 666 ----- CATCH -Kudu error(s) reported, first error: Not found: key not found -==== ----- QUERY --- Re-insert the data -insert into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) ----- RESULTS -: 1 -==== ----- QUERY -delete ignore a from tdata a, tdata b where a.id = 666 ---- RESULTS ---- RUNTIME_PROFILE row_regex: .*NumModifiedRows: 1.* @@ -368,7 +348,7 @@ SELECT * FROM functional_kudu.alltypes WHERE id < 100; row_regex: .*NumModifiedRows: 100.* ==== ---- QUERY -INSERT IGNORE INTO kudu_test_tbl +INSERT INTO kudu_test_tbl SELECT * FROM functional_kudu.alltypes WHERE id < 100; ---- RESULTS : 0 @@ -377,12 +357,6 @@ row_regex: .*NumModifiedRows: 0.* ==== ---- QUERY INSERT INTO kudu_test_tbl -SELECT * FROM functional_kudu.alltypes WHERE id < 100; ----- CATCH -Kudu error(s) reported, first error: Already present: key already present -==== ----- QUERY -INSERT IGNORE INTO kudu_test_tbl SELECT * FROM functional_kudu.alltypes; ---- RESULTS : 7200 @@ -413,15 +387,6 @@ FROM functional_kudu.alltypes Kudu error(s) reported, first error: Not found: No tablet covering the requested range partition: NonCoveredRange { lower_bound: (int32 id=10001), upper_bound: (<end>) ==== ---- QUERY -# Insert rows that are not covered by any of the existing range partitions -INSERT IGNORE INTO kudu_test_tbl SELECT cast(id + 10000 as int), bool_col, tinyint_col, - smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, - timestamp_col,year, month -FROM functional_kudu.alltypes ----- CATCH -Kudu error(s) reported, first error: Not found: No tablet covering the requested range partition: NonCoveredRange { lower_bound: (int32 id=10001), upper_bound: (<end>) -==== ----- QUERY # Try to delete a row with a primary key value that is not covered by the existing range # partitions DELETE FROM kudu_test_tbl WHERE id = 10001
