IMPALA-3710: Kudu DML should ignore conflicts, pt2 Second part of IMPALA-3710, which removed the IGNORE DML option and changed the following errors on Kudu DML operations to be ignored: 1) INSERT where the PK already exists 2) UPDATE/DELETE where the PK doesn't exist
This changes other data-related errors to be ignored as well: 3) NULLs in non-nullable columns, i.e. null constraint violoations. 4) Rows with PKs that are in an 'uncovered range'. It became clear that we can't differentiate between (3) and (4) because both return a Kudu 'NotFound' error code. The Impala error codes have been simplified as well: we just report a generic KUDU_NOT_FOUND error in these cases. This also adds some metadata to the thrift report sent to the coordinator from sinks so the total number of rows with errors can be added to the profile. Note that this does not include a breakdown of error counts by type/code because we cannot differentiate between all of these cases yet. An upcoming change will add this new info to the beeswax interface and show it in the shell output (IMPALA-3713). Testing: Updated kudu_crud tests to check the number of rows with errors. Change-Id: I4eb1ad91dc355ea51de261c3a14df0f9d28c879c Reviewed-on: http://gerrit.cloudera.org:8080/4985 Reviewed-by: Alex Behm <[email protected]> Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cfac09de Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cfac09de Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cfac09de Branch: refs/heads/hadoop-next Commit: cfac09de10c996a48852b9a9d50c70cf24cf5f5f Parents: d7246d6 Author: Matthew Jacobs <[email protected]> Authored: Mon Nov 7 15:55:42 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Wed Nov 9 06:43:41 2016 +0000 ---------------------------------------------------------------------- be/src/exec/data-sink.cc | 17 ++- be/src/exec/data-sink.h | 9 +- be/src/exec/hdfs-table-sink.cc | 2 +- be/src/exec/kudu-table-sink.cc | 89 +++++++++------- be/src/exec/kudu-table-sink.h | 31 +++--- be/src/runtime/coordinator.cc | 4 +- common/thrift/ImpalaInternalService.thrift | 21 +++- common/thrift/generate_error_codes.py | 5 +- .../queries/QueryTest/kudu_crud.test | 103 ++++++++++++------- 9 files changed, 180 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index 6a34543..17f1f8c 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -112,9 +112,18 @@ Status DataSink::CreateDataSink(ObjectPool* pool, return Status::OK(); } -void DataSink::MergeInsertStats(const TInsertStats& src_stats, +void DataSink::MergeDmlStats(const TInsertStats& src_stats, TInsertStats* dst_stats) { dst_stats->bytes_written += src_stats.bytes_written; + if (src_stats.__isset.kudu_stats) { + if (!dst_stats->__isset.kudu_stats) dst_stats->__set_kudu_stats(TKuduDmlStats()); + if (!dst_stats->kudu_stats.__isset.num_row_errors) { + dst_stats->kudu_stats.__set_num_row_errors(0); + } + + dst_stats->kudu_stats.__set_num_row_errors( + dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors); + } if (src_stats.__isset.parquet_stats) { if (dst_stats->__isset.parquet_stats) { MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size, @@ -125,7 +134,7 @@ void DataSink::MergeInsertStats(const TInsertStats& src_stats, } } -string DataSink::OutputInsertStats(const PartitionStatusMap& stats, +string DataSink::OutputDmlStats(const PartitionStatusMap& stats, const string& prefix) { const char* indent = " "; stringstream ss; @@ -148,6 +157,10 @@ string DataSink::OutputInsertStats(const PartitionStatusMap& stats, if (!val.second.__isset.stats) continue; const TInsertStats& stats = val.second.stats; + if (stats.__isset.kudu_stats) { + ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl; + } + ss << indent << "BytesWritten: " << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES); if (stats.__isset.parquet_stats) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/data-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index 95acf16..1970cf2 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -85,14 +85,13 @@ class DataSink { const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink); - /// Merges one update to the insert stats for a partition. dst_stats will have the + /// Merges one update to the DML stats for a partition. dst_stats will have the /// combined stats of src_stats and dst_stats after this method returns. - static void MergeInsertStats(const TInsertStats& src_stats, + static void MergeDmlStats(const TInsertStats& src_stats, TInsertStats* dst_stats); - /// Outputs the insert stats contained in the map of insert partition updates to a - /// string - static std::string OutputInsertStats(const PartitionStatusMap& stats, + /// Outputs the DML stats contained in the map of partition updates to a string + static std::string OutputDmlStats(const PartitionStatusMap& stats, const std::string& prefix = ""); MemTracker* mem_tracker() const { return mem_tracker_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 63bd648..07c1167 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -595,7 +595,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, // initialised. DCHECK(it != state->per_partition_status()->end()); it->second.num_modified_rows += partition->num_rows; - DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats); + DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats); } RETURN_IF_ERROR(ClosePartitionFile(state, partition)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index a58baab..11613aa 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -61,9 +61,9 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, select_list_texprs_(select_list_texprs), sink_action_(tsink.table_sink.action), kudu_table_sink_(tsink.table_sink.kudu_table_sink), - kudu_error_counter_(NULL), - rows_written_(NULL), - rows_written_rate_(NULL) { + total_rows_(NULL), + num_row_errors_(NULL), + rows_processed_rate_(NULL) { DCHECK(KuduIsAvailable()); } @@ -96,15 +96,19 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { TInsertPartitionStatus root_status; root_status.__set_num_modified_rows(0L); root_status.__set_id(-1L); + TKuduDmlStats kudu_dml_stats; + kudu_dml_stats.__set_num_row_errors(0L); + root_status.__set_stats(TInsertStats()); + root_status.stats.__set_kudu_stats(kudu_dml_stats); state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); // Add counters - kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", TUnit::UNIT); - rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT); + total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT); + num_row_errors_ = ADD_COUNTER(profile(), "NumRowErrors", TUnit::UNIT); kudu_apply_timer_ = ADD_TIMER(profile(), "KuduApplyTimer"); - rows_written_rate_ = profile()->AddDerivedCounter( - "RowsWrittenRate", TUnit::UNIT_PER_SECOND, - bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_written_, + rows_processed_rate_ = profile()->AddDerivedCounter( + "RowsProcessedRate", TUnit::UNIT_PER_SECOND, + bind<int64_t>(&RuntimeProfile::UnitsPerSecond, total_rows_, profile()->total_time_counter())); return Status::OK(); @@ -174,16 +178,21 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { SCOPED_TIMER(profile()->total_time_counter()); ExprContext::FreeLocalAllocations(output_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); + const KuduSchema& table_schema = table_->schema(); // Collect all write operations and apply them together so the time in Apply() can be // easily timed. vector<unique_ptr<kudu::client::KuduWriteOperation>> write_ops; - int rows_added = 0; + // Count the number of rows with nulls in non-nullable columns, i.e. null constraint + // violations. + int num_null_violations = 0; + // Since everything is set up just forward everything to the writer. for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* current_row = batch->GetRow(i); unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp()); + bool add_row = true; for (int j = 0; j < output_expr_ctxs_.size(); ++j) { // For INSERT, output_expr_ctxs_ will contain all columns of the table in order. @@ -198,14 +207,20 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { // If the value is NULL, we only need to explicitly set it for UPDATE and UPSERT. // For INSERT, it can be ignored as unspecified cols will be implicitly set to NULL. if (value == NULL) { - if (sink_action_ == TSinkAction::UPDATE || sink_action_ == TSinkAction::UPSERT) { - DCHECK(!kudu_table_sink_.referenced_columns.empty()); + if (table_schema.Column(j).is_nullable()) { KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col), "Could not add Kudu WriteOp."); + continue; } else { - DCHECK(kudu_table_sink_.referenced_columns.empty()); + // This row violates the nullability constraints of the column, do not attempt + // to write this row because it is already known to be an error and the Kudu + // error will be difficult to interpret later (error code isn't specific). + ++num_null_violations; + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_NULL_CONSTRAINT_VIOLATION, + table_desc_->table_name())); + add_row = false; + break; // skip remaining columns for this row } - continue; } PrimitiveType type = output_expr_ctxs_[j]->root()->type().type; @@ -257,18 +272,22 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type)); } } - write_ops.push_back(move(write)); + if (add_row) write_ops.push_back(move(write)); } { SCOPED_TIMER(kudu_apply_timer_); for (auto&& write: write_ops) { KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying Kudu Op."); - ++rows_added; } } - COUNTER_ADD(rows_written_, rows_added); + // Increment for all rows received by the sink, including errors. + COUNTER_ADD(total_rows_, batch->num_rows()); + // Add the number of null constraint violations to the number of row errors, which + // isn't reported by Kudu in CheckForErrors() because those rows were never + // successfully added to the KuduSession. + COUNTER_ADD(num_row_errors_, num_null_violations); RETURN_IF_ERROR(CheckForErrors(state)); return Status::OK(); } @@ -281,7 +300,6 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) { // Get the pending errors from the Kudu session. If errors overflowed the error buffer // we can't be sure all errors can be ignored, so an error status will be reported. - // TODO: Make sure Kudu handles conflict errors properly if IGNORE is set (KUDU-1563). bool error_overflow = false; session_->GetPendingErrors(&errors, &error_overflow); if (UNLIKELY(error_overflow)) { @@ -292,28 +310,27 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) { // them accordingly. for (int i = 0; i < errors.size(); ++i) { kudu::Status e = errors[i]->status(); - // 'Duplicate key' or 'key already present' errors from Kudu do not fail the query. - if ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) || - (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) || - (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent())) { - if (status.ok()) { - status = Status(strings::Substitute( - "Kudu error(s) reported, first error: $0", e.ToString())); - } - } if (e.IsNotFound()) { - state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_NOT_FOUND, - table_desc_->table_name())); + // Kudu does not yet have a way to programmatically differentiate between 'row not + // found' and 'tablet not found' (e.g. PK in a non-covered range) and both have the + // IsNotFound error code. + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_NOT_FOUND, + table_desc_->table_name(), e.ToString()), 2); } else if (e.IsAlreadyPresent()) { state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_ALREADY_PRESENT, - table_desc_->table_name())); + table_desc_->table_name()), 2); } else { + if (status.ok()) { + status = Status(strings::Substitute( + "Kudu error(s) reported, first error: $0", e.ToString())); + } state->LogError(ErrorMsg::Init(TErrorCode::KUDU_SESSION_ERROR, - table_desc_->table_name(), e.ToString())); + table_desc_->table_name(), e.ToString()), 2); } delete errors[i]; } - COUNTER_ADD(kudu_error_counter_, errors.size()); + + COUNTER_ADD(num_row_errors_, errors.size()); return status; } @@ -326,10 +343,12 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) { VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString(); } Status status = CheckForErrors(state); - (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows( - rows_written_->value() - kudu_error_counter_->value()); - (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_kudu_latest_observed_ts( - client_->GetLatestObservedTimestamp()); + TInsertPartitionStatus& insert_status = + (*state->per_partition_status())[ROOT_PARTITION_KEY]; + insert_status.__set_num_modified_rows( + total_rows_->value() - num_row_errors_->value()); + insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value()); + insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp()); return status; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/kudu-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index adf264e..3ff667c 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -44,14 +44,13 @@ namespace impala { /// because Kudu currently has some 8MB buffer limits. /// /// Kudu doesn't have transactions yet, so some rows may fail to write while others are -/// successful. The Kudu client reports errors, some of which may be considered to be -/// expected: rows that fail to be written/updated/deleted due to a key conflict will not -/// result in the sink returning an error. Any other kind of error reported by Kudu -/// results in the sink returning an error status. The first non-ignored error is -/// returned in the sink's Status. All reported errors (ignored or not) will be logged -/// via the RuntimeState. -/// TODO: Handle other data/constraint-violation errors as ignored, e.g. null in -/// non-nullable col +/// successful. The Kudu client reports errors, some of which are treated as warnings and +/// will not fail the query: PK already exists on INSERT, key not found on UPDATE/DELETE, +/// NULL in a non-nullable column, and PK specifying rows in an uncovered range. +/// The number of rows that cannot be modified due to these errors is reported in the +/// TInsertPartitionStatus report sent by the DataSink to the coordinator. +/// Any other kind of error reported by Kudu results in the sink returning an error +/// status. All reported errors (ignored or not) will be logged via the RuntimeState. class KuduTableSink : public DataSink { public: KuduTableSink(const RowDescriptor& row_desc, @@ -101,7 +100,6 @@ class KuduTableSink : public DataSink { std::vector<ExprContext*> output_expr_ctxs_; /// The Kudu client, table and session. - /// This uses 'kudu::client::sp::shared_ptr' as that is the type expected by Kudu. kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_; kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_; kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_; @@ -112,19 +110,22 @@ class KuduTableSink : public DataSink { /// Captures parameters passed down from the frontend TKuduTableSink kudu_table_sink_; - /// Total number of errors returned from Kudu. - RuntimeProfile::Counter* kudu_error_counter_; - /// Time spent applying Kudu operations. In normal circumstances, Apply() should be /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled. /// Significant time spent in Apply() may indicate that Kudu cannot buffer and send /// rows as fast as the sink can write them. RuntimeProfile::Counter* kudu_apply_timer_; - /// Total number of rows written including errors. - RuntimeProfile::Counter* rows_written_; - RuntimeProfile::Counter* rows_written_rate_; + /// Total number of rows processed, i.e. rows written to Kudu and also rows with + /// errors. + RuntimeProfile::Counter* total_rows_; + + /// The number of rows with errors. + RuntimeProfile::Counter* num_row_errors_; + /// Rate at which the sink consumes and processes rows, i.e. writing rows to Kudu or + /// skipping rows that are known to violate nullability constraints. + RuntimeProfile::Counter* rows_processed_rate_; }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index c9a166d..0a278ae 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -1090,7 +1090,7 @@ Status Coordinator::Wait() { if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery()); query_profile_->AddInfoString( - "Insert Stats", DataSink::OutputInsertStats(per_partition_status_, "\n")); + "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n")); // For DML queries, when Wait is done, the query is complete. Report aggregate // query profiles at this point. // TODO: make sure ReportQuerySummary gets called on error @@ -1489,7 +1489,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para if (partition.second.__isset.stats) { if (!status->__isset.stats) status->__set_stats(TInsertStats()); - DataSink::MergeInsertStats(partition.second.stats, &status->stats); + DataSink::MergeDmlStats(partition.second.stats, &status->stats); } } files_to_move_.insert( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 15c0020..d5a92ca 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -410,16 +410,28 @@ struct TParquetInsertStats { 1: required map<string, i64> per_column_size } -// Per partition insert stats +struct TKuduDmlStats { + // The number of reported per-row errors, i.e. this many rows were not modified. + // Note that this aggregate is less useful than a breakdown of the number of errors by + // error type, e.g. number of rows with duplicate key conflicts, number of rows + // violating nullability constraints, etc., but it isn't possible yet to differentiate + // all error types in the KuduTableSink yet. + 1: optional i64 num_row_errors +} + +// Per partition DML stats // TODO: this should include the table stats that we update the metastore with. +// TODO: Refactor to reflect usage by other DML statements. struct TInsertStats { 1: required i64 bytes_written 2: optional TParquetInsertStats parquet_stats + 3: optional TKuduDmlStats kudu_stats } const string ROOT_PARTITION_KEY = '' -// Per-partition statistics and metadata resulting from INSERT queries. +// Per-partition statistics and metadata resulting from DML statements. +// TODO: Refactor to reflect usage by other DML statements. struct TInsertPartitionStatus { // The id of the partition written to (may be -1 if the partition is created by this // query). See THdfsTable.partitions. @@ -434,13 +446,14 @@ struct TInsertPartitionStatus { // Fully qualified URI to the base directory for this partition. 4: required string partition_base_dir - // The latest observed Kudu timestamp reported by the KuduSession at this partition. + // The latest observed Kudu timestamp reported by the local KuduSession. // This value is an unsigned int64. 5: optional i64 kudu_latest_observed_ts } -// The results of an INSERT query, sent to the coordinator as part of +// The results of a DML statement, sent to the coordinator as part of // TReportExecStatusParams +// TODO: Refactor to reflect usage by other DML statements. struct TInsertExecStatus { // A map from temporary absolute file path to final absolute destination. The // coordinator performs these updates after the query completes. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index e9df146..c96cfc8 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -295,7 +295,7 @@ error_codes = ( ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."), - ("KUDU_KEY_NOT_FOUND", 96, "Key not found in Kudu table '$0'."), + ("KUDU_NOT_FOUND", 96, "Not found in Kudu table '$0': $1"), ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1"), @@ -303,6 +303,9 @@ error_codes = ( ("AVRO_INVALID_DECIMAL", 99, "Column '$0': invalid Avro decimal type with precision = '$1' scale = '$2'"), + + ("KUDU_NULL_CONSTRAINT_VIOLATION", 100, + "Row with null value violates nullability constraint on table '$0'."), ) import sys http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index 761db13..1ecfdf5 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -31,14 +31,16 @@ insert into tdata values ---- RESULTS : 3 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 3.* +NumModifiedRows: 3 +NumRowErrors: 0 ==== ---- QUERY update tdata set vali=43 where id = 1 ---- RESULTS # TODO: Verify row count after fixing IMPALA-3713 (Here and UPDATE/DELETE below) ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -56,7 +58,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as varchar(20)) where id = 1 ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -71,7 +74,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN update tdata set valb=false where id = 1 ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -86,7 +90,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN update tdata set vali=43 where id > 1 ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 2.* +NumModifiedRows: 2 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -101,7 +106,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN update tdata set name='unknown' where name = 'martin' ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -119,7 +125,8 @@ insert into tdata values ---- RESULTS : 2 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 2.* +NumModifiedRows: 2 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -136,7 +143,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN update tdata set name=null where id = 40 ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -152,7 +160,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set name='he' where id = 40 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ---- RESULTS ==== ---- QUERY @@ -173,7 +182,8 @@ insert into tdata values (320, '', 2.0, 932, cast('' as string), false) ---- RESULTS : 1 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select id, name, valv, valb from tdata where id = 320; @@ -194,7 +204,8 @@ insert into ignore_column_case values (1, 'Martin', 1.0, 10); ---- RESULTS : 1 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY select ID, nAmE, VALF, VALI from ignore_column_case where NaMe = 'Martin'; @@ -209,7 +220,8 @@ insert into tdata values ---- RESULTS : 1 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 0 ==== ---- QUERY insert into tdata values @@ -217,28 +229,23 @@ insert into tdata values ---- RESULTS : 0 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 0.* +NumModifiedRows: 0 +NumRowErrors: 1 ==== ---- QUERY -- Updating the same record many times: cross join produces 7 identical updates update a set a.name='Satan' from tdata a, tdata b where a.id = 666 ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 7.* -==== ----- QUERY --- Does not exercise any error path in the sink because updating the same record multiple --- times is valid. -update a set a.name='Satan' from tdata a, tdata b where a.id = 666 ----- RESULTS ----- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 7.* +NumModifiedRows: 7 +NumRowErrors: 0 ==== ---- QUERY delete a from tdata a, tdata b where a.id = 666 ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 1.* +NumModifiedRows: 1 +NumRowErrors: 6 ==== ---- QUERY select * from tdata @@ -256,6 +263,9 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN upsert into table tdata values (40, 'they', 1, 43, cast('e' as VARCHAR(20)), false), (1, NULL, 1, 0, cast('a' as VARCHAR(20)), true) ---- RESULTS +---- RUNTIME_PROFILE +NumModifiedRows: 2 +NumRowErrors: 0 ==== ---- QUERY select * from tdata @@ -306,8 +316,9 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ==== ---- QUERY upsert into table tdata (id, name) values (null, '') ----- CATCH -Could not add Kudu WriteOp.: Invalid argument: column not nullable: id[int32 NOT NULL] +---- RUNTIME_PROFILE +NumModifiedRows: 0 +NumRowErrors: 1 ==== ---- QUERY # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct @@ -328,7 +339,8 @@ insert into impala_3454 values delete from impala_3454 where key_1 < (select max(key_2) from impala_3454) ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 2.* +NumModifiedRows: 2 +NumRowErrors: 0 ==== ---- QUERY select * from impala_3454 @@ -345,7 +357,8 @@ SELECT * FROM functional_kudu.alltypes WHERE id < 100; ---- RESULTS 'Inserted 100 row(s)' ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 100.* +NumModifiedRows: 100 +NumRowErrors: 0 ==== ---- QUERY INSERT INTO kudu_test_tbl @@ -353,7 +366,8 @@ SELECT * FROM functional_kudu.alltypes WHERE id < 100; ---- RESULTS : 0 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 0.* +NumModifiedRows: 0 +NumRowErrors: 100 ==== ---- QUERY INSERT INTO kudu_test_tbl @@ -361,42 +375,53 @@ SELECT * FROM functional_kudu.alltypes; ---- RESULTS : 7200 ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 7200.* +NumModifiedRows: 7200 +NumRowErrors: 100 ==== ---- QUERY # Test a larger UPDATE UPDATE kudu_test_tbl SET int_col = -1; ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 7300.* +NumModifiedRows: 7300 +NumRowErrors: 0 ==== ---- QUERY # Test a larger DELETE DELETE FROM kudu_test_tbl WHERE id > -1; ---- RESULTS ---- RUNTIME_PROFILE -row_regex: .*NumModifiedRows: 7300.* +NumModifiedRows: 7300 +NumRowErrors: 0 ==== ---- QUERY # Insert rows that are not covered by any of the existing range partitions +# Only the row at 10000 is inserted. INSERT INTO kudu_test_tbl SELECT cast(id + 10000 as int), bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month FROM functional_kudu.alltypes ----- CATCH -Kudu error(s) reported, first error: Not found: No tablet covering the requested range partition: NonCoveredRange { lower_bound: (int32 id=10001), upper_bound: (<end>) +---- RUNTIME_PROFILE +NumModifiedRows: 1 +NumRowErrors: 7299 ==== ---- QUERY # Try to delete a row with a primary key value that is not covered by the existing range -# partitions +# partitions. This doesn't actually end up selecting any rows to modify. DELETE FROM kudu_test_tbl WHERE id = 10001 ---- RESULTS +---- RUNTIME_PROFILE +NumModifiedRows: 0 +NumRowErrors: 0 ==== ---- QUERY # Try to update a row with a primary key value that is not covered by the existing range -# partitions +# partitions. This doesn't actually end up selecting any rows to modify. UPDATE kudu_test_tbl SET int_col = 10 WHERE id = 10001 ---- RESULTS +---- RUNTIME_PROFILE +NumModifiedRows: 0 +NumRowErrors: 0 ==== ---- QUERY # IMPALA-2521: clustered insert into table. @@ -414,6 +439,9 @@ from functional_kudu.alltypessmall group by id, name ) as sub; ---- RESULTS : 10 +---- RUNTIME_PROFILE +NumModifiedRows: 10 +NumRowErrors: 0 ==== ---- QUERY select * from impala_2521 @@ -442,7 +470,10 @@ create table allkeytypes (i1 tinyint, i2 smallint, i3 int, i4 bigint, name strin ---- QUERY insert into allkeytypes select cast(id as tinyint), smallint_col, int_col, cast (bigint_col/10 as bigint), string_col, float_col, double_col - from functional.alltypes where id > 0 and id < 4 + from functional.alltypes where id > 0 and id < 10 ---- RESULTS : 3 +---- RUNTIME_PROFILE +NumModifiedRows: 3 +NumRowErrors: 6 ====
