IMPALA-3710: Kudu DML should ignore conflicts, pt2

Second part of IMPALA-3710, which removed the IGNORE DML
option and changed the following errors on Kudu DML
operations to be ignored:
1) INSERT where the PK already exists
2) UPDATE/DELETE where the PK doesn't exist

This changes other data-related errors to be ignored as
well:
3) NULLs in non-nullable columns, i.e. null constraint
  violoations.
4) Rows with PKs that are in an 'uncovered range'.

It became clear that we can't differentiate between (3) and
(4) because both return a Kudu 'NotFound' error code. The
Impala error codes have been simplified as well: we just
report a generic KUDU_NOT_FOUND error in these cases.

This also adds some metadata to the thrift report sent to
the coordinator from sinks so the total number of rows with
errors can be added to the profile. Note that this does not
include a breakdown of error counts by type/code because we
cannot differentiate between all of these cases yet.

An upcoming change will add this new info to the beeswax
interface and show it in the shell output (IMPALA-3713).

Testing: Updated kudu_crud tests to check the number of rows
with errors.

Change-Id: I4eb1ad91dc355ea51de261c3a14df0f9d28c879c
Reviewed-on: http://gerrit.cloudera.org:8080/4985
Reviewed-by: Alex Behm <[email protected]>
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cfac09de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cfac09de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cfac09de

Branch: refs/heads/hadoop-next
Commit: cfac09de10c996a48852b9a9d50c70cf24cf5f5f
Parents: d7246d6
Author: Matthew Jacobs <[email protected]>
Authored: Mon Nov 7 15:55:42 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Wed Nov 9 06:43:41 2016 +0000

----------------------------------------------------------------------
 be/src/exec/data-sink.cc                        |  17 ++-
 be/src/exec/data-sink.h                         |   9 +-
 be/src/exec/hdfs-table-sink.cc                  |   2 +-
 be/src/exec/kudu-table-sink.cc                  |  89 +++++++++-------
 be/src/exec/kudu-table-sink.h                   |  31 +++---
 be/src/runtime/coordinator.cc                   |   4 +-
 common/thrift/ImpalaInternalService.thrift      |  21 +++-
 common/thrift/generate_error_codes.py           |   5 +-
 .../queries/QueryTest/kudu_crud.test            | 103 ++++++++++++-------
 9 files changed, 180 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 6a34543..17f1f8c 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -112,9 +112,18 @@ Status DataSink::CreateDataSink(ObjectPool* pool,
   return Status::OK();
 }
 
-void DataSink::MergeInsertStats(const TInsertStats& src_stats,
+void DataSink::MergeDmlStats(const TInsertStats& src_stats,
     TInsertStats* dst_stats) {
   dst_stats->bytes_written += src_stats.bytes_written;
+  if (src_stats.__isset.kudu_stats) {
+    if (!dst_stats->__isset.kudu_stats) 
dst_stats->__set_kudu_stats(TKuduDmlStats());
+    if (!dst_stats->kudu_stats.__isset.num_row_errors) {
+      dst_stats->kudu_stats.__set_num_row_errors(0);
+    }
+
+    dst_stats->kudu_stats.__set_num_row_errors(
+        dst_stats->kudu_stats.num_row_errors + 
src_stats.kudu_stats.num_row_errors);
+  }
   if (src_stats.__isset.parquet_stats) {
     if (dst_stats->__isset.parquet_stats) {
       MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
@@ -125,7 +134,7 @@ void DataSink::MergeInsertStats(const TInsertStats& 
src_stats,
   }
 }
 
-string DataSink::OutputInsertStats(const PartitionStatusMap& stats,
+string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
     const string& prefix) {
   const char* indent = "  ";
   stringstream ss;
@@ -148,6 +157,10 @@ string DataSink::OutputInsertStats(const 
PartitionStatusMap& stats,
 
     if (!val.second.__isset.stats) continue;
     const TInsertStats& stats = val.second.stats;
+    if (stats.__isset.kudu_stats) {
+      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
+    }
+
     ss << indent << "BytesWritten: "
        << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
     if (stats.__isset.parquet_stats) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 95acf16..1970cf2 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -85,14 +85,13 @@ class DataSink {
     const TPlanFragmentInstanceCtx& fragment_instance_ctx,
     const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink);
 
-  /// Merges one update to the insert stats for a partition. dst_stats will 
have the
+  /// Merges one update to the DML stats for a partition. dst_stats will have 
the
   /// combined stats of src_stats and dst_stats after this method returns.
-  static void MergeInsertStats(const TInsertStats& src_stats,
+  static void MergeDmlStats(const TInsertStats& src_stats,
       TInsertStats* dst_stats);
 
-  /// Outputs the insert stats contained in the map of insert partition 
updates to a
-  /// string
-  static std::string OutputInsertStats(const PartitionStatusMap& stats,
+  /// Outputs the DML stats contained in the map of partition updates to a 
string
+  static std::string OutputDmlStats(const PartitionStatusMap& stats,
       const std::string& prefix = "");
 
   MemTracker* mem_tracker() const { return mem_tracker_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 63bd648..07c1167 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -595,7 +595,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* 
state,
     // initialised.
     DCHECK(it != state->per_partition_status()->end());
     it->second.num_modified_rows += partition->num_rows;
-    DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats);
+    DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats);
   }
 
   RETURN_IF_ERROR(ClosePartitionFile(state, partition));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index a58baab..11613aa 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -61,9 +61,9 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
       select_list_texprs_(select_list_texprs),
       sink_action_(tsink.table_sink.action),
       kudu_table_sink_(tsink.table_sink.kudu_table_sink),
-      kudu_error_counter_(NULL),
-      rows_written_(NULL),
-      rows_written_rate_(NULL) {
+      total_rows_(NULL),
+      num_row_errors_(NULL),
+      rows_processed_rate_(NULL) {
   DCHECK(KuduIsAvailable());
 }
 
@@ -96,15 +96,19 @@ Status KuduTableSink::Prepare(RuntimeState* state, 
MemTracker* mem_tracker) {
   TInsertPartitionStatus root_status;
   root_status.__set_num_modified_rows(0L);
   root_status.__set_id(-1L);
+  TKuduDmlStats kudu_dml_stats;
+  kudu_dml_stats.__set_num_row_errors(0L);
+  root_status.__set_stats(TInsertStats());
+  root_status.stats.__set_kudu_stats(kudu_dml_stats);
   state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, 
root_status));
 
   // Add counters
-  kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", 
TUnit::UNIT);
-  rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT);
+  total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT);
+  num_row_errors_ = ADD_COUNTER(profile(), "NumRowErrors", TUnit::UNIT);
   kudu_apply_timer_ = ADD_TIMER(profile(), "KuduApplyTimer");
-  rows_written_rate_ = profile()->AddDerivedCounter(
-      "RowsWrittenRate", TUnit::UNIT_PER_SECOND,
-      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_written_,
+  rows_processed_rate_ = profile()->AddDerivedCounter(
+      "RowsProcessedRate", TUnit::UNIT_PER_SECOND,
+      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, total_rows_,
       profile()->total_time_counter()));
 
   return Status::OK();
@@ -174,16 +178,21 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
   SCOPED_TIMER(profile()->total_time_counter());
   ExprContext::FreeLocalAllocations(output_expr_ctxs_);
   RETURN_IF_ERROR(state->CheckQueryState());
+  const KuduSchema& table_schema = table_->schema();
 
   // Collect all write operations and apply them together so the time in 
Apply() can be
   // easily timed.
   vector<unique_ptr<kudu::client::KuduWriteOperation>> write_ops;
 
-  int rows_added = 0;
+  // Count the number of rows with nulls in non-nullable columns, i.e. null 
constraint
+  // violations.
+  int num_null_violations = 0;
+
   // Since everything is set up just forward everything to the writer.
   for (int i = 0; i < batch->num_rows(); ++i) {
     TupleRow* current_row = batch->GetRow(i);
     unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
+    bool add_row = true;
 
     for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
       // For INSERT, output_expr_ctxs_ will contain all columns of the table 
in order.
@@ -198,14 +207,20 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
       // If the value is NULL, we only need to explicitly set it for UPDATE 
and UPSERT.
       // For INSERT, it can be ignored as unspecified cols will be implicitly 
set to NULL.
       if (value == NULL) {
-        if (sink_action_ == TSinkAction::UPDATE || sink_action_ == 
TSinkAction::UPSERT) {
-          DCHECK(!kudu_table_sink_.referenced_columns.empty());
+        if (table_schema.Column(j).is_nullable()) {
           KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),
               "Could not add Kudu WriteOp.");
+          continue;
         } else {
-          DCHECK(kudu_table_sink_.referenced_columns.empty());
+          // This row violates the nullability constraints of the column, do 
not attempt
+          // to write this row because it is already known to be an error and 
the Kudu
+          // error will be difficult to interpret later (error code isn't 
specific).
+          ++num_null_violations;
+          
state->LogError(ErrorMsg::Init(TErrorCode::KUDU_NULL_CONSTRAINT_VIOLATION,
+              table_desc_->table_name()));
+          add_row = false;
+          break; // skip remaining columns for this row
         }
-        continue;
       }
 
       PrimitiveType type = output_expr_ctxs_[j]->root()->type().type;
@@ -257,18 +272,22 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
           return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, 
TypeToString(type));
       }
     }
-    write_ops.push_back(move(write));
+    if (add_row) write_ops.push_back(move(write));
   }
 
   {
     SCOPED_TIMER(kudu_apply_timer_);
     for (auto&& write: write_ops) {
       KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying 
Kudu Op.");
-      ++rows_added;
     }
   }
 
-  COUNTER_ADD(rows_written_, rows_added);
+  // Increment for all rows received by the sink, including errors.
+  COUNTER_ADD(total_rows_, batch->num_rows());
+  // Add the number of null constraint violations to the number of row errors, 
which
+  // isn't reported by Kudu in CheckForErrors() because those rows were never
+  // successfully added to the KuduSession.
+  COUNTER_ADD(num_row_errors_, num_null_violations);
   RETURN_IF_ERROR(CheckForErrors(state));
   return Status::OK();
 }
@@ -281,7 +300,6 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) {
 
   // Get the pending errors from the Kudu session. If errors overflowed the 
error buffer
   // we can't be sure all errors can be ignored, so an error status will be 
reported.
-  // TODO: Make sure Kudu handles conflict errors properly if IGNORE is set 
(KUDU-1563).
   bool error_overflow = false;
   session_->GetPendingErrors(&errors, &error_overflow);
   if (UNLIKELY(error_overflow)) {
@@ -292,28 +310,27 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) 
{
   // them accordingly.
   for (int i = 0; i < errors.size(); ++i) {
     kudu::Status e = errors[i]->status();
-    // 'Duplicate key' or 'key already present' errors from Kudu do not fail 
the query.
-    if ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) ||
-        (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) ||
-        (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent())) {
-      if (status.ok()) {
-        status = Status(strings::Substitute(
-            "Kudu error(s) reported, first error: $0", e.ToString()));
-      }
-    }
     if (e.IsNotFound()) {
-      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_NOT_FOUND,
-          table_desc_->table_name()));
+      // Kudu does not yet have a way to programmatically differentiate 
between 'row not
+      // found' and 'tablet not found' (e.g. PK in a non-covered range) and 
both have the
+      // IsNotFound error code.
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_NOT_FOUND,
+          table_desc_->table_name(), e.ToString()), 2);
     } else if (e.IsAlreadyPresent()) {
       state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_ALREADY_PRESENT,
-          table_desc_->table_name()));
+          table_desc_->table_name()), 2);
     } else {
+      if (status.ok()) {
+        status = Status(strings::Substitute(
+            "Kudu error(s) reported, first error: $0", e.ToString()));
+      }
       state->LogError(ErrorMsg::Init(TErrorCode::KUDU_SESSION_ERROR,
-          table_desc_->table_name(), e.ToString()));
+          table_desc_->table_name(), e.ToString()), 2);
     }
     delete errors[i];
   }
-  COUNTER_ADD(kudu_error_counter_, errors.size());
+
+  COUNTER_ADD(num_row_errors_, errors.size());
   return status;
 }
 
@@ -326,10 +343,12 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
     VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
   }
   Status status = CheckForErrors(state);
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows(
-      rows_written_->value() - kudu_error_counter_->value());
-  
(*state->per_partition_status())[ROOT_PARTITION_KEY].__set_kudu_latest_observed_ts(
-      client_->GetLatestObservedTimestamp());
+  TInsertPartitionStatus& insert_status =
+      (*state->per_partition_status())[ROOT_PARTITION_KEY];
+  insert_status.__set_num_modified_rows(
+      total_rows_->value() - num_row_errors_->value());
+  
insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value());
+  
insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index adf264e..3ff667c 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -44,14 +44,13 @@ namespace impala {
 /// because Kudu currently has some 8MB buffer limits.
 ///
 /// Kudu doesn't have transactions yet, so some rows may fail to write while 
others are
-/// successful. The Kudu client reports errors, some of which may be 
considered to be
-/// expected: rows that fail to be written/updated/deleted due to a key 
conflict will not
-/// result in the sink returning an error. Any other kind of error reported by 
Kudu
-/// results in the sink returning an error status. The first non-ignored error 
is
-/// returned in the sink's Status. All reported errors (ignored or not) will 
be logged
-/// via the RuntimeState.
-/// TODO: Handle other data/constraint-violation errors as ignored, e.g. null 
in
-/// non-nullable col
+/// successful. The Kudu client reports errors, some of which are treated as 
warnings and
+/// will not fail the query: PK already exists on INSERT, key not found on 
UPDATE/DELETE,
+/// NULL in a non-nullable column, and PK specifying rows in an uncovered 
range.
+/// The number of rows that cannot be modified due to these errors is reported 
in the
+/// TInsertPartitionStatus report sent by the DataSink to the coordinator.
+/// Any other kind of error reported by Kudu results in the sink returning an 
error
+/// status. All reported errors (ignored or not) will be logged via the 
RuntimeState.
 class KuduTableSink : public DataSink {
  public:
   KuduTableSink(const RowDescriptor& row_desc,
@@ -101,7 +100,6 @@ class KuduTableSink : public DataSink {
   std::vector<ExprContext*> output_expr_ctxs_;
 
   /// The Kudu client, table and session.
-  /// This uses 'kudu::client::sp::shared_ptr' as that is the type expected by 
Kudu.
   kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
   kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
   kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_;
@@ -112,19 +110,22 @@ class KuduTableSink : public DataSink {
   /// Captures parameters passed down from the frontend
   TKuduTableSink kudu_table_sink_;
 
-  /// Total number of errors returned from Kudu.
-  RuntimeProfile::Counter* kudu_error_counter_;
-
   /// Time spent applying Kudu operations. In normal circumstances, Apply() 
should be
   /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled.
   /// Significant time spent in Apply() may indicate that Kudu cannot buffer 
and send
   /// rows as fast as the sink can write them.
   RuntimeProfile::Counter* kudu_apply_timer_;
 
-  /// Total number of rows written including errors.
-  RuntimeProfile::Counter* rows_written_;
-  RuntimeProfile::Counter* rows_written_rate_;
+  /// Total number of rows processed, i.e. rows written to Kudu and also rows 
with
+  /// errors.
+  RuntimeProfile::Counter* total_rows_;
+
+  /// The number of rows with errors.
+  RuntimeProfile::Counter* num_row_errors_;
 
+  /// Rate at which the sink consumes and processes rows, i.e. writing rows to 
Kudu or
+  /// skipping rows that are known to violate nullability constraints.
+  RuntimeProfile::Counter* rows_processed_rate_;
 };
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index c9a166d..0a278ae 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1090,7 +1090,7 @@ Status Coordinator::Wait() {
   if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
 
   query_profile_->AddInfoString(
-      "Insert Stats", DataSink::OutputInsertStats(per_partition_status_, 
"\n"));
+      "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
   // For DML queries, when Wait is done, the query is complete.  Report 
aggregate
   // query profiles at this point.
   // TODO: make sure ReportQuerySummary gets called on error
@@ -1489,7 +1489,7 @@ Status Coordinator::UpdateFragmentExecStatus(const 
TReportExecStatusParams& para
 
       if (partition.second.__isset.stats) {
         if (!status->__isset.stats) status->__set_stats(TInsertStats());
-        DataSink::MergeInsertStats(partition.second.stats, &status->stats);
+        DataSink::MergeDmlStats(partition.second.stats, &status->stats);
       }
     }
     files_to_move_.insert(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 15c0020..d5a92ca 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -410,16 +410,28 @@ struct TParquetInsertStats {
   1: required map<string, i64> per_column_size
 }
 
-// Per partition insert stats
+struct TKuduDmlStats {
+  // The number of reported per-row errors, i.e. this many rows were not 
modified.
+  // Note that this aggregate is less useful than a breakdown of the number of 
errors by
+  // error type, e.g. number of rows with duplicate key conflicts, number of 
rows
+  // violating nullability constraints, etc., but it isn't possible yet to 
differentiate
+  // all error types in the KuduTableSink yet.
+  1: optional i64 num_row_errors
+}
+
+// Per partition DML stats
 // TODO: this should include the table stats that we update the metastore with.
+// TODO: Refactor to reflect usage by other DML statements.
 struct TInsertStats {
   1: required i64 bytes_written
   2: optional TParquetInsertStats parquet_stats
+  3: optional TKuduDmlStats kudu_stats
 }
 
 const string ROOT_PARTITION_KEY = ''
 
-// Per-partition statistics and metadata resulting from INSERT queries.
+// Per-partition statistics and metadata resulting from DML statements.
+// TODO: Refactor to reflect usage by other DML statements.
 struct TInsertPartitionStatus {
   // The id of the partition written to (may be -1 if the partition is created 
by this
   // query). See THdfsTable.partitions.
@@ -434,13 +446,14 @@ struct TInsertPartitionStatus {
   // Fully qualified URI to the base directory for this partition.
   4: required string partition_base_dir
 
-  // The latest observed Kudu timestamp reported by the KuduSession at this 
partition.
+  // The latest observed Kudu timestamp reported by the local KuduSession.
   // This value is an unsigned int64.
   5: optional i64 kudu_latest_observed_ts
 }
 
-// The results of an INSERT query, sent to the coordinator as part of
+// The results of a DML statement, sent to the coordinator as part of
 // TReportExecStatusParams
+// TODO: Refactor to reflect usage by other DML statements.
 struct TInsertExecStatus {
   // A map from temporary absolute file path to final absolute destination. The
   // coordinator performs these updates after the query completes.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index e9df146..c96cfc8 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -295,7 +295,7 @@ error_codes = (
 
   ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."),
 
-  ("KUDU_KEY_NOT_FOUND", 96, "Key not found in Kudu table '$0'."),
+  ("KUDU_NOT_FOUND", 96, "Not found in Kudu table '$0': $1"),
 
   ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1"),
 
@@ -303,6 +303,9 @@ error_codes = (
 
   ("AVRO_INVALID_DECIMAL", 99,
       "Column '$0': invalid Avro decimal type with precision = '$1' scale = 
'$2'"),
+
+  ("KUDU_NULL_CONSTRAINT_VIOLATION", 100,
+      "Row with null value violates nullability constraint on table '$0'."),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cfac09de/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index 761db13..1ecfdf5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -31,14 +31,16 @@ insert into tdata values
 ---- RESULTS
 : 3
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 3.*
+NumModifiedRows: 3
+NumRowErrors: 0
 ====
 ---- QUERY
 update tdata set vali=43 where id = 1
 ---- RESULTS
 # TODO: Verify row count after fixing IMPALA-3713 (Here and UPDATE/DELETE 
below)
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -56,7 +58,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as varchar(20)) 
where id = 1
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -71,7 +74,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 update tdata set valb=false where id = 1
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -86,7 +90,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 update tdata set vali=43 where id > 1
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 2.*
+NumModifiedRows: 2
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -101,7 +106,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 update tdata set name='unknown' where name = 'martin'
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -119,7 +125,8 @@ insert into tdata values
 ---- RESULTS
 : 2
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 2.*
+NumModifiedRows: 2
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -136,7 +143,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 update tdata set name=null where id = 40
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -152,7 +160,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set name='he' where id = 40
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ---- RESULTS
 ====
 ---- QUERY
@@ -173,7 +182,8 @@ insert into tdata values (320, '', 2.0, 932, cast('' as 
string), false)
 ---- RESULTS
 : 1
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select id, name, valv, valb from tdata where id = 320;
@@ -194,7 +204,8 @@ insert into ignore_column_case values (1, 'Martin', 1.0, 
10);
 ---- RESULTS
 : 1
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 select ID, nAmE, VALF, VALI from ignore_column_case where NaMe = 'Martin';
@@ -209,7 +220,8 @@ insert into tdata values
 ---- RESULTS
 : 1
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 0
 ====
 ---- QUERY
 insert into tdata values
@@ -217,28 +229,23 @@ insert into tdata values
 ---- RESULTS
 : 0
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 0.*
+NumModifiedRows: 0
+NumRowErrors: 1
 ====
 ---- QUERY
 -- Updating the same record many times: cross join produces 7 identical updates
 update a set a.name='Satan' from tdata a, tdata b where a.id = 666
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 7.*
-====
----- QUERY
--- Does not exercise any error path in the sink because updating the same 
record multiple
--- times is valid.
-update a set a.name='Satan' from tdata a, tdata b where a.id = 666
----- RESULTS
----- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 7.*
+NumModifiedRows: 7
+NumRowErrors: 0
 ====
 ---- QUERY
 delete a from tdata a, tdata b where a.id = 666
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 1.*
+NumModifiedRows: 1
+NumRowErrors: 6
 ====
 ---- QUERY
 select * from tdata
@@ -256,6 +263,9 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 upsert into table tdata values (40, 'they', 1, 43, cast('e' as VARCHAR(20)), 
false),
 (1, NULL, 1, 0, cast('a' as VARCHAR(20)), true)
 ---- RESULTS
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from tdata
@@ -306,8 +316,9 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ====
 ---- QUERY
 upsert into table tdata (id, name) values (null, '')
----- CATCH
-Could not add Kudu WriteOp.: Invalid argument: column not nullable: id[int32 
NOT NULL]
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumRowErrors: 1
 ====
 ---- QUERY
 # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column 
order correct
@@ -328,7 +339,8 @@ insert into impala_3454 values
 delete from impala_3454 where key_1 < (select max(key_2) from impala_3454)
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 2.*
+NumModifiedRows: 2
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from impala_3454
@@ -345,7 +357,8 @@ SELECT * FROM functional_kudu.alltypes WHERE id < 100;
 ---- RESULTS
 'Inserted 100 row(s)'
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 100.*
+NumModifiedRows: 100
+NumRowErrors: 0
 ====
 ---- QUERY
 INSERT INTO kudu_test_tbl
@@ -353,7 +366,8 @@ SELECT * FROM functional_kudu.alltypes WHERE id < 100;
 ---- RESULTS
 : 0
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 0.*
+NumModifiedRows: 0
+NumRowErrors: 100
 ====
 ---- QUERY
 INSERT INTO kudu_test_tbl
@@ -361,42 +375,53 @@ SELECT * FROM functional_kudu.alltypes;
 ---- RESULTS
 : 7200
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 7200.*
+NumModifiedRows: 7200
+NumRowErrors: 100
 ====
 ---- QUERY
 # Test a larger UPDATE
 UPDATE kudu_test_tbl SET int_col = -1;
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 7300.*
+NumModifiedRows: 7300
+NumRowErrors: 0
 ====
 ---- QUERY
 # Test a larger DELETE
 DELETE FROM kudu_test_tbl WHERE id > -1;
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumModifiedRows: 7300.*
+NumModifiedRows: 7300
+NumRowErrors: 0
 ====
 ---- QUERY
 # Insert rows that are not covered by any of the existing range partitions
+# Only the row at 10000 is inserted.
 INSERT INTO kudu_test_tbl SELECT cast(id + 10000 as int), bool_col, 
tinyint_col,
   smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, 
string_col,
   timestamp_col, year, month
 FROM functional_kudu.alltypes
----- CATCH
-Kudu error(s) reported, first error: Not found: No tablet covering the 
requested range partition: NonCoveredRange { lower_bound: (int32 id=10001), 
upper_bound: (<end>)
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumRowErrors: 7299
 ====
 ---- QUERY
 # Try to delete a row with a primary key value that is not covered by the 
existing range
-# partitions
+# partitions. This doesn't actually end up selecting any rows to modify.
 DELETE FROM kudu_test_tbl WHERE id = 10001
 ---- RESULTS
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumRowErrors: 0
 ====
 ---- QUERY
 # Try to update a row with a primary key value that is not covered by the 
existing range
-# partitions
+# partitions. This doesn't actually end up selecting any rows to modify.
 UPDATE kudu_test_tbl SET int_col = 10 WHERE id = 10001
 ---- RESULTS
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumRowErrors: 0
 ====
 ---- QUERY
 # IMPALA-2521: clustered insert into table.
@@ -414,6 +439,9 @@ from functional_kudu.alltypessmall group by id, name
 ) as sub;
 ---- RESULTS
 : 10
+---- RUNTIME_PROFILE
+NumModifiedRows: 10
+NumRowErrors: 0
 ====
 ---- QUERY
 select * from impala_2521
@@ -442,7 +470,10 @@ create table allkeytypes (i1 tinyint, i2 smallint, i3 int, 
i4 bigint, name strin
 ---- QUERY
 insert into allkeytypes select cast(id as tinyint), smallint_col, int_col,
   cast (bigint_col/10 as bigint), string_col, float_col, double_col
-  from functional.alltypes where id > 0 and id < 4
+  from functional.alltypes where id > 0 and id < 10
 ---- RESULTS
 : 3
+---- RUNTIME_PROFILE
+NumModifiedRows: 3
+NumRowErrors: 6
 ====

Reply via email to