This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 731f5f3b144e93fdc5b55313a71cc9cec920692e
Author: Brock Noland <[email protected]>
AuthorDate: Sat Dec 1 22:54:58 2018 +0000

    KUDU-1563. Add an INSERT_IGNORE operation
    
    This patch adds an `INSERT_IGNORE' operation which behaves like
    a normal `INSERT' except in the case when a duplicate row error
    would be raised by the primary key having been previously inserted.
    
    In the future if more types of errors need to be ignored, other than
    key errors, we can add session or batch level configurations to set
    which errors should be ignored. I didn’t do that in this patch to keep
    things simple and becase ignoring key errors is the only use-case I
    have seen required/requested. At that time UPSERT_INGORE could
    be added as well.
    
    Follow on patches will add more client support.
    
    Change-Id: I5bfc35e9d27bd5e2d3375b68e6e4716ed671f36c
    Reviewed-on: http://gerrit.cloudera.org:8080/4491
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/client-test.cc                    | 118 ++++++++++++++++------
 src/kudu/client/client.cc                         |   4 +
 src/kudu/client/client.h                          |   6 ++
 src/kudu/client/write_op.cc                       |   9 ++
 src/kudu/client/write_op.h                        |  31 +++++-
 src/kudu/common/row_operations-test.cc            |   6 +-
 src/kudu/common/row_operations.cc                 |   3 +
 src/kudu/common/row_operations.h                  |   2 +-
 src/kudu/common/wire_protocol.proto               |   1 +
 src/kudu/integration-tests/fuzz-itest.cc          |  57 +++++++++--
 src/kudu/tablet/local_tablet_writer.h             |   4 +
 src/kudu/tablet/row_op.cc                         |   6 ++
 src/kudu/tablet/row_op.h                          |   4 +
 src/kudu/tablet/tablet-test-base.h                |  16 ++-
 src/kudu/tablet/tablet-test.cc                    |  49 ++++++++-
 src/kudu/tablet/tablet.cc                         |  46 ++++++---
 src/kudu/tablet/tablet.h                          |   4 +-
 src/kudu/tablet/tablet_bootstrap.cc               |   6 +-
 src/kudu/tablet/tablet_metrics.cc                 |   6 ++
 src/kudu/tablet/tablet_metrics.h                  |   1 +
 src/kudu/tablet/tablet_random_access-test.cc      |  44 +++++---
 src/kudu/tablet/transactions/transaction.cc       |   2 +
 src/kudu/tablet/transactions/transaction.h        |   1 +
 src/kudu/tablet/transactions/write_transaction.cc |  12 ++-
 24 files changed, 359 insertions(+), 79 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 47a2510..a3be4b3 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -353,10 +353,10 @@ class ClientTest : public KuduTest {
 
   // Inserts given number of tests rows into the specified table
   // in the context of the session.
-  void InsertTestRows(KuduTable* table, KuduSession* session,
+  static void InsertTestRows(KuduTable* table, KuduSession* session,
                       int num_rows, int first_row = 0) {
     for (int i = first_row; i < num_rows + first_row; ++i) {
-      unique_ptr<KuduInsert> insert(BuildTestRow(table, i));
+      unique_ptr<KuduInsert> insert(BuildTestInsert(table, i));
       ASSERT_OK(session->Apply(insert.release()));
     }
   }
@@ -401,17 +401,28 @@ class ClientTest : public KuduTest {
     NO_FATALS(CheckNoRpcOverflow());
   }
 
-  unique_ptr<KuduInsert> BuildTestRow(KuduTable* table, int index) {
+  static unique_ptr<KuduInsert> BuildTestInsert(KuduTable* table, int index) {
     unique_ptr<KuduInsert> insert(table->NewInsert());
     KuduPartialRow* row = insert->mutable_row();
+    PopulateDefaultRow(row, index);
+    return insert;
+  }
+
+  static unique_ptr<KuduInsertIgnore> BuildTestInsertIgnore(KuduTable* table, 
int index) {
+    unique_ptr<KuduInsertIgnore> insert(table->NewInsertIgnore());
+    KuduPartialRow* row = insert->mutable_row();
+    PopulateDefaultRow(row, index);
+    return insert;
+  }
+
+  static void PopulateDefaultRow(KuduPartialRow* row, int index) {
     CHECK_OK(row->SetInt32(0, index));
     CHECK_OK(row->SetInt32(1, index * 2));
     CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", index))));
     CHECK_OK(row->SetInt32(3, index * 3));
-    return insert;
   }
 
-  unique_ptr<KuduUpdate> UpdateTestRow(KuduTable* table, int index) {
+  static unique_ptr<KuduUpdate> UpdateTestRow(KuduTable* table, int index) {
     unique_ptr<KuduUpdate> update(table->NewUpdate());
     KuduPartialRow* row = update->mutable_row();
     CHECK_OK(row->SetInt32(0, index));
@@ -420,14 +431,13 @@ class ClientTest : public KuduTest {
     return update;
   }
 
-  unique_ptr<KuduDelete> DeleteTestRow(KuduTable* table, int index) {
+  static unique_ptr<KuduDelete> DeleteTestRow(KuduTable* table, int index) {
     unique_ptr<KuduDelete> del(table->NewDelete());
     KuduPartialRow* row = del->mutable_row();
     CHECK_OK(row->SetInt32(0, index));
     return del;
   }
 
-
   void DoTestScanResourceMetrics() {
     KuduScanner scanner(client_table_.get());
     string tablet_id = GetFirstTabletId(client_table_.get());
@@ -1066,13 +1076,13 @@ TEST_P(ScanMultiTabletParamTest, Test) {
   session->SetTimeoutMillis(5000);
   for (int i = 1; i < kTabletsNum; ++i) {
     unique_ptr<KuduInsert> insert;
-    insert = BuildTestRow(table.get(), 2 + i * kRowsPerTablet);
+    insert = BuildTestInsert(table.get(), 2 + i * kRowsPerTablet);
     ASSERT_OK(session->Apply(insert.release()));
-    insert = BuildTestRow(table.get(), 3 + i * kRowsPerTablet);
+    insert = BuildTestInsert(table.get(), 3 + i * kRowsPerTablet);
     ASSERT_OK(session->Apply(insert.release()));
-    insert = BuildTestRow(table.get(), 5 + i * kRowsPerTablet);
+    insert = BuildTestInsert(table.get(), 5 + i * kRowsPerTablet);
     ASSERT_OK(session->Apply(insert.release()));
-    insert = BuildTestRow(table.get(), 7 + i * kRowsPerTablet);
+    insert = BuildTestInsert(table.get(), 7 + i * kRowsPerTablet);
     ASSERT_OK(session->Apply(insert.release()));
   }
   FlushSessionOrDie(session);
@@ -1608,13 +1618,13 @@ TEST_F(ClientTest, TestNonCoveringRangePartitions) {
   ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
   session->SetTimeoutMillis(60000);
   vector<unique_ptr<KuduInsert>> out_of_range_inserts;
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), -50));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), -1));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 100));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 150));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 199));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 300));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 350));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), -50));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), -1));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 100));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 150));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 199));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 300));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 350));
 
   for (auto& insert : out_of_range_inserts) {
     client_->data_->meta_cache_->ClearCache();
@@ -1728,7 +1738,7 @@ TEST_F(ClientTest, 
TestOpenTableClearsNonCoveringRangePartitions) {
   // Attempt to insert into the non-covered range, priming the meta cache.
   shared_ptr<KuduSession> session = client_->NewSession();
   ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
-  ASSERT_TRUE(session->Apply(BuildTestRow(table.get(), 
1).release()).IsIOError());
+  ASSERT_TRUE(session->Apply(BuildTestInsert(table.get(), 
1).release()).IsIOError());
   {
     vector<KuduError*> errors;
     ElementDeleter drop(&errors);
@@ -1758,7 +1768,7 @@ TEST_F(ClientTest, 
TestOpenTableClearsNonCoveringRangePartitions) {
 
   // Attempt to insert again into the non-covered range. It should still fail,
   // because the meta cache still contains the non-covered entry.
-  ASSERT_TRUE(session->Apply(BuildTestRow(table.get(), 
1).release()).IsIOError());
+  ASSERT_TRUE(session->Apply(BuildTestInsert(table.get(), 
1).release()).IsIOError());
   {
     vector<KuduError*> errors;
     ElementDeleter drop(&errors);
@@ -1772,7 +1782,7 @@ TEST_F(ClientTest, 
TestOpenTableClearsNonCoveringRangePartitions) {
   // Re-open the table, and attempt to insert again.  This time the meta cache
   // should clear non-covered entries, and the insert should succeed.
   ASSERT_OK(client_->OpenTable(kTableName, &table));
-  ASSERT_OK(session->Apply(BuildTestRow(table.get(), 1).release()));
+  ASSERT_OK(session->Apply(BuildTestInsert(table.get(), 1).release()));
 }
 
 TEST_F(ClientTest, TestExclusiveInclusiveRangeBounds) {
@@ -1814,13 +1824,13 @@ TEST_F(ClientTest, TestExclusiveInclusiveRangeBounds) {
   ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
   session->SetTimeoutMillis(60000);
   vector<unique_ptr<KuduInsert>> out_of_range_inserts;
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), -50));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), -1));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 100));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 150));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 199));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 300));
-  out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 350));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), -50));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), -1));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 100));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 150));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 199));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 300));
+  out_of_range_inserts.emplace_back(BuildTestInsert(table.get(), 350));
 
   for (auto& insert : out_of_range_inserts) {
     Status result = session->Apply(insert.release());
@@ -2411,12 +2421,60 @@ TEST_F(ClientTest, TestInsertSingleRowManualBatch) {
   // Retry
   ASSERT_OK(insert->mutable_row()->SetInt32("key", 12345));
   ASSERT_OK(session->Apply(insert.release()));
-  ASSERT_TRUE(insert == nullptr) << "Successful insert should take ownership";
   ASSERT_TRUE(session->HasPendingOperations()) << "Should be pending until we 
Flush";
 
   FlushSessionOrDie(session);
 }
 
+static void DoTestInsertIgnoreVerifyRows(const shared_ptr<KuduTable>& tbl, int 
num_rows) {
+  vector<string> rows;
+  KuduScanner scanner(tbl.get());
+  ASSERT_OK(ScanToStrings(&scanner, &rows));
+  ASSERT_EQ(num_rows, rows.size());
+  for (int i = 0; i < num_rows; i++) {
+    int key = i + 1;
+    ASSERT_EQ(StringPrintf("(int32 key=%d, int32 int_val=%d, string 
string_val=\"hello %d\", "
+        "int32 non_null_with_default=%d)", key, key*2, key, key*3), rows[i]);
+  }
+}
+
+TEST_F(ClientTest, TestInsertIgnore) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  {
+    unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+    ASSERT_OK(session->Apply(insert.release()));
+    DoTestInsertIgnoreVerifyRows(client_table_, 1);
+  }
+
+  {
+    // INSERT IGNORE results in no error on duplicate primary key
+    unique_ptr<KuduInsertIgnore> 
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1));
+    ASSERT_OK(session->Apply(insert_ignore.release()));
+    DoTestInsertIgnoreVerifyRows(client_table_, 1);
+  }
+
+  {
+    // INSERT IGNORE cannot update row
+    unique_ptr<KuduInsertIgnore> 
insert_ignore(client_table_->NewInsertIgnore());
+    ASSERT_OK(insert_ignore->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(insert_ignore->mutable_row()->SetInt32("int_val", 999));
+    ASSERT_OK(insert_ignore->mutable_row()->SetStringCopy("string_val", "hello 
world"));
+    ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default", 
999));
+    ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but 
results in no change
+    DoTestInsertIgnoreVerifyRows(client_table_, 1);
+  }
+
+  {
+    // INSERT IGNORE can insert new row
+    unique_ptr<KuduInsertIgnore> 
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2));
+    ASSERT_OK(session->Apply(insert_ignore.release()));
+    DoTestInsertIgnoreVerifyRows(client_table_, 2);
+  }
+}
+
 TEST_F(ClientTest, TestInsertAutoFlushSync) {
   shared_ptr<KuduSession> session = client_->NewSession();
   ASSERT_FALSE(session->HasPendingOperations());
@@ -5089,7 +5147,7 @@ TEST_F(ClientTest, TestReadAtSnapshotNoTimestampSet) {
     shared_ptr<KuduSession> session(client_->NewSession());
     ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
     for (size_t i = 0; i < kTabletsNum * kRowsPerTablet; ++i) {
-      unique_ptr<KuduInsert> insert(BuildTestRow(table.get(), i));
+      unique_ptr<KuduInsert> insert(BuildTestInsert(table.get(), i));
       ASSERT_OK(session->Apply(insert.release()));
     }
     FlushSessionOrDie(session);
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 0e1a1b2..ba7fa2b 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -929,6 +929,10 @@ KuduInsert* KuduTable::NewInsert() {
   return new KuduInsert(shared_from_this());
 }
 
+KuduInsertIgnore* KuduTable::NewInsertIgnore() {
+  return new KuduInsertIgnore(shared_from_this());
+}
+
 KuduUpsert* KuduTable::NewUpsert() {
   return new KuduUpsert(shared_from_this());
 }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 1746822..dd58883 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -72,6 +72,7 @@ namespace client {
 
 class KuduDelete;
 class KuduInsert;
+class KuduInsertIgnore;
 class KuduLoggingCallback;
 class KuduPartitioner;
 class KuduScanBatch;
@@ -1038,6 +1039,11 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
   ///   KuduSession::Apply().
   KuduInsert* NewInsert();
 
+  /// @return New @c INSERT_IGNORE operation for this table. It is the
+  ///   caller's responsibility to free the result, unless it is passed to
+  ///   KuduSession::Apply().
+  KuduInsertIgnore* NewInsertIgnore();
+
   /// @return New @c UPSERT operation for this table. It is the caller's
   ///   responsibility to free the result, unless it is passed to
   ///   KuduSession::Apply().
diff --git a/src/kudu/client/write_op.cc b/src/kudu/client/write_op.cc
index b159b87..83d7e19 100644
--- a/src/kudu/client/write_op.cc
+++ b/src/kudu/client/write_op.cc
@@ -44,6 +44,7 @@ RowOperationsPB_Type 
ToInternalWriteType(KuduWriteOperation::Type type) {
     case KuduWriteOperation::UPDATE: return RowOperationsPB_Type_UPDATE;
     case KuduWriteOperation::DELETE: return RowOperationsPB_Type_DELETE;
     case KuduWriteOperation::UPSERT: return RowOperationsPB_Type_UPSERT;
+    case KuduWriteOperation::INSERT_IGNORE: return 
RowOperationsPB_Type_INSERT_IGNORE;
     default: LOG(FATAL) << "Unexpected write operation type: " << type;
   }
 }
@@ -105,6 +106,14 @@ KuduInsert::KuduInsert(const shared_ptr<KuduTable>& table)
 
 KuduInsert::~KuduInsert() {}
 
+// InsertIgnore 
-----------------------------------------------------------------
+
+KuduInsertIgnore::KuduInsertIgnore(const shared_ptr<KuduTable>& table)
+  : KuduWriteOperation(table) {
+}
+
+KuduInsertIgnore::~KuduInsertIgnore() {}
+
 // Update 
-----------------------------------------------------------------------
 
 KuduUpdate::KuduUpdate(const shared_ptr<KuduTable>& table)
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index a0a5f61..73265f4 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -69,7 +69,8 @@ class KUDU_EXPORT KuduWriteOperation {
     INSERT = 1,
     UPDATE = 2,
     DELETE = 3,
-    UPSERT = 4
+    UPSERT = 4,
+    INSERT_IGNORE = 5
   };
   virtual ~KuduWriteOperation();
 
@@ -161,6 +162,34 @@ class KUDU_EXPORT KuduInsert : public KuduWriteOperation {
   explicit KuduInsert(const sp::shared_ptr<KuduTable>& table);
 };
 
+
+/// @brief A single row insert ignore to be sent to the cluster, duplicate row 
errors are ignored.
+///
+/// @pre An insert ignore requires all key columns to be set, as well as all
+///   columns which do not have default values.
+class KUDU_EXPORT KuduInsertIgnore : public KuduWriteOperation {
+ public:
+  virtual ~KuduInsertIgnore();
+
+  /// @copydoc KuduWriteOperation::ToString()
+  virtual std::string ToString() const OVERRIDE { return "INSERT IGNORE " + 
row_.ToString(); }
+
+ protected:
+  /// @cond PROTECTED_MEMBERS_DOCUMENTED
+
+  /// @copydoc KuduWriteOperation::type()
+  virtual Type type() const OVERRIDE {
+    return INSERT_IGNORE;
+  }
+
+  /// @endcond
+
+ private:
+  friend class KuduTable;
+  explicit KuduInsertIgnore(const sp::shared_ptr<KuduTable>& table);
+};
+
+
 /// @brief A single row upsert to be sent to the cluster.
 ///
 /// See KuduInsert for more details.
diff --git a/src/kudu/common/row_operations-test.cc 
b/src/kudu/common/row_operations-test.cc
index d28a490..fec5efd 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <cstdlib>
+#include <initializer_list>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -124,7 +125,7 @@ void RowOperationsTest::CheckDecodeDoesntCrash(const 
Schema& client_schema,
 void RowOperationsTest::DoFuzzTest(const Schema& server_schema,
                                    const KuduPartialRow& row,
                                    int n_random_changes) {
-  for (int operation = 0; operation <= 8; operation++) {
+  for (int operation = 0; operation <= 9; operation++) {
     RowOperationsPB pb;
     RowOperationsPBEncoder enc(&pb);
 
@@ -156,6 +157,9 @@ void RowOperationsTest::DoFuzzTest(const Schema& 
server_schema,
       case 8:
         enc.Add(RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND, row);
         break;
+      case 9:
+        enc.Add(RowOperationsPB::INSERT_IGNORE, row);
+        break;
     }
 
     const Schema* client_schema = row.schema();
diff --git a/src/kudu/common/row_operations.cc 
b/src/kudu/common/row_operations.cc
index 548209f..1788a17 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -65,6 +65,8 @@ string DecodedRowOperation::ToString(const Schema& schema) 
const {
       return "UNKNOWN";
     case RowOperationsPB::INSERT:
       return "INSERT " + schema.DebugRow(ConstContiguousRow(&schema, 
row_data));
+    case RowOperationsPB::INSERT_IGNORE:
+      return "INSERT IGNORE " + schema.DebugRow(ConstContiguousRow(&schema, 
row_data));
     case RowOperationsPB::UPSERT:
       return "UPSERT " + schema.DebugRow(ConstContiguousRow(&schema, 
row_data));
     case RowOperationsPB::UPDATE:
@@ -693,6 +695,7 @@ Status 
RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
     case RowOperationsPB::UNKNOWN:
       return Status::NotSupported("Unknown row operation type");
     case RowOperationsPB::INSERT:
+    case RowOperationsPB::INSERT_IGNORE:
     case RowOperationsPB::UPSERT:
       RETURN_NOT_OK(DecodeInsertOrUpsert(prototype_row_storage, mapping, op));
       break;
diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h
index e3e441a..5835556 100644
--- a/src/kudu/common/row_operations.h
+++ b/src/kudu/common/row_operations.h
@@ -75,7 +75,7 @@ class RowOperationsPBEncoder {
 struct DecodedRowOperation {
   RowOperationsPB::Type type;
 
-  // For INSERT or UPSERT, the whole projected row.
+  // For INSERT, INSERT_IGNORE, or UPSERT, the whole projected row.
   // For UPDATE or DELETE, the row key.
   const uint8_t* row_data;
 
diff --git a/src/kudu/common/wire_protocol.proto 
b/src/kudu/common/wire_protocol.proto
index 225ce47..719643e 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -162,6 +162,7 @@ message RowOperationsPB {
     UPDATE = 2;
     DELETE = 3;
     UPSERT = 5;
+    INSERT_IGNORE = 10;
 
     // Used when specifying split rows on table creation.
     SPLIT_ROW = 4;
diff --git a/src/kudu/integration-tests/fuzz-itest.cc 
b/src/kudu/integration-tests/fuzz-itest.cc
index 5f0e0de..fd3a904 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -104,6 +104,8 @@ namespace tablet {
 enum TestOpType {
   TEST_INSERT,
   TEST_INSERT_PK_ONLY,
+  TEST_INSERT_IGNORE,
+  TEST_INSERT_IGNORE_PK_ONLY,
   TEST_UPSERT,
   TEST_UPSERT_PK_ONLY,
   TEST_UPDATE,
@@ -124,6 +126,8 @@ const char* kTableName = "table";
 const char* TestOpType_names[] = {
   "TEST_INSERT",
   "TEST_INSERT_PK_ONLY",
+  "TEST_INSERT_IGNORE",
+  "TEST_INSERT_IGNORE_PK_ONLY",
   "TEST_UPSERT",
   "TEST_UPSERT_PK_ONLY",
   "TEST_UPDATE",
@@ -172,6 +176,8 @@ struct TestOp {
         return strings::Substitute("{$0}", TestOpType_names[type]);
       case TEST_INSERT:
       case TEST_INSERT_PK_ONLY:
+      case TEST_INSERT_IGNORE:
+      case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
       case TEST_UPDATE:
@@ -214,6 +220,8 @@ struct Redo {
 
 const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_INSERT_PK_ONLY,
+                                  TEST_INSERT_IGNORE,
+                                  TEST_INSERT_IGNORE_PK_ONLY,
                                   TEST_UPSERT,
                                   TEST_UPSERT_PK_ONLY,
                                   TEST_UPDATE,
@@ -229,6 +237,7 @@ const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_DIFF_SCAN};
 
 const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
+                                     TEST_INSERT_IGNORE_PK_ONLY,
                                      TEST_UPSERT_PK_ONLY,
                                      TEST_DELETE,
                                      TEST_FLUSH_OPS,
@@ -323,6 +332,8 @@ class FuzzTest : public KuduTest {
     unique_ptr<KuduWriteOperation> op;
     if (type == TEST_INSERT || type == TEST_INSERT_PK_ONLY) {
       op.reset(table_->NewInsert());
+    } else if (type == TEST_INSERT_IGNORE || type == 
TEST_INSERT_IGNORE_PK_ONLY) {
+      op.reset(table_->NewInsertIgnore());
     } else {
       op.reset(table_->NewUpsert());
     }
@@ -331,6 +342,7 @@ class FuzzTest : public KuduTest {
     ret.key = key;
     switch (type) {
       case TEST_INSERT:
+      case TEST_INSERT_IGNORE:
       case TEST_UPSERT: {
         if (val & 1) {
           CHECK_OK(row->SetNull(1));
@@ -338,13 +350,18 @@ class FuzzTest : public KuduTest {
           CHECK_OK(row->SetInt32(1, val));
           ret.val = val;
         }
+        if (type == TEST_INSERT_IGNORE && old_row) {
+          // insert ignore when the row already exists results in old value
+          ret.val = old_row->val;
+        }
         break;
       }
       case TEST_INSERT_PK_ONLY:
         break;
+      case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_UPSERT_PK_ONLY: {
-        // For "upsert PK only", we expect the row to keep its old value if
-        // the row existed, or NULL if there was no old row.
+        // For "upsert PK only" and "insert ignore PK only", we expect the row
+        // to keep its old value if the row existed, or NULL if there was no 
old row.
         ret.val = old_row ? old_row->val : boost::none;
         break;
       }
@@ -687,6 +704,8 @@ bool IsMutation(const TestOpType& op) {
   switch (op) {
     case TEST_INSERT:
     case TEST_INSERT_PK_ONLY:
+    case TEST_INSERT_IGNORE:
+    case TEST_INSERT_IGNORE_PK_ONLY:
     case TEST_UPSERT:
     case TEST_UPSERT_PK_ONLY:
     case TEST_UPDATE:
@@ -726,10 +745,20 @@ void GenerateTestCase(vector<TestOp>* ops, int len, 
TestOpSets sets = ALL) {
         ops_pending = true;
         data_in_mrs = true;
         break;
+      case TEST_INSERT_IGNORE:
+      case TEST_INSERT_IGNORE_PK_ONLY:
+        ops->emplace_back(r, row_key);
+        ops_pending = true;
+        // If the row doesn't currently exist, this will act like an insert
+        // and put it into MRS.
+        if (!exists[row_key]) {
+          data_in_mrs = true;
+        }
+        exists[row_key] = true;
+        break;
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
         ops->emplace_back(r, row_key);
-        exists[row_key] = true;
         ops_pending = true;
         // If the row doesn't currently exist, this will act like an insert
         // and put it into MRS.
@@ -740,6 +769,7 @@ void GenerateTestCase(vector<TestOp>* ops, int len, 
TestOpSets sets = ALL) {
           // a DMS.
           data_in_dms = true;
         }
+        exists[row_key] = true;
         break;
       case TEST_UPDATE:
         if (!exists[row_key]) continue;
@@ -849,6 +879,8 @@ void FuzzTest::ValidateFuzzCase(const vector<TestOp>& 
test_ops) {
         CHECK(!exists[test_op.val]) << "invalid case: inserting 
already-existing row";
         exists[test_op.val] = true;
         break;
+      case TEST_INSERT_IGNORE:
+      case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
         exists[test_op.val] = true;
@@ -888,17 +920,28 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_INSERT_PK_ONLY:
+      case TEST_INSERT_IGNORE:
+      case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY: {
         RedoType rtype = pending_val[test_op.val] ? UPDATE : INSERT;
         pending_val[test_op.val] = InsertOrUpsertRow(
             test_op.val, i++, pending_val[test_op.val], test_op.type);
 
-        // A PK-only UPSERT that is converted into an UPDATE will be dropped
-        // server-side. We must do the same.
-        if (test_op.type != TEST_UPSERT_PK_ONLY || rtype != UPDATE) {
-          pending_redos.emplace_back(rtype, test_op.val, 
pending_val[test_op.val]->val);
+        // An insert ignore on a row that already exists will be dropped 
server-side.
+        // We must do the same.
+        if ((test_op.type == TEST_INSERT_IGNORE || test_op.type == 
TEST_INSERT_IGNORE_PK_ONLY) &&
+            rtype == UPDATE) {
+          break;
         }
+
+        // An "upsert PK-only" that is converted into an update will be 
dropped server-side.
+        // We must do the same.
+        if (test_op.type == TEST_UPSERT_PK_ONLY && rtype == UPDATE) {
+          break;
+        }
+
+        pending_redos.emplace_back(rtype, test_op.val, 
pending_val[test_op.val]->val);
         break;
       }
       case TEST_UPDATE:
diff --git a/src/kudu/tablet/local_tablet_writer.h 
b/src/kudu/tablet/local_tablet_writer.h
index 777bad4..96ebaed 100644
--- a/src/kudu/tablet/local_tablet_writer.h
+++ b/src/kudu/tablet/local_tablet_writer.h
@@ -65,6 +65,10 @@ class LocalTabletWriter {
     return Write(RowOperationsPB::INSERT, row);
   }
 
+  Status InsertIgnore(const KuduPartialRow& row) {
+    return Write(RowOperationsPB::INSERT_IGNORE, row);
+  }
+
   Status Upsert(const KuduPartialRow& row) {
     return Write(RowOperationsPB::UPSERT, row);
   }
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index 464a8ed..efe28e0 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -52,6 +52,12 @@ void RowOp::SetInsertSucceeded(int mrs_id) {
   result->add_mutated_stores()->set_mrs_id(mrs_id);
 }
 
+void RowOp::SetErrorIgnored() {
+  DCHECK(!result) << SecureDebugString(*result);
+  result.reset(new OperationResultPB());
+  error_ignored = true;
+}
+
 void RowOp::SetMutateSucceeded(gscoped_ptr<OperationResultPB> result) {
   DCHECK(!this->result) << SecureDebugString(*result);
   this->result = std::move(result);
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index c63a345..ade5ef9 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -42,6 +42,7 @@ struct RowOp {
   // Only one of the following four functions must be called, at most once.
   void SetFailed(const Status& s);
   void SetInsertSucceeded(int mrs_id);
+  void SetErrorIgnored();
   void SetMutateSucceeded(gscoped_ptr<OperationResultPB> result);
   // Sets the result of a skipped operation on bootstrap.
   // TODO(dralves) Currently this performs a copy. Might be avoided with some 
refactoring.
@@ -92,6 +93,9 @@ struct RowOp {
   // for this op does not exist in any RowSet.
   bool checked_present = false;
 
+  // True if an ignore op was ignored due to an error.
+  bool error_ignored = false;
+
   // The RowSet in which this op's key has been found present and alive.
   // This will be null if 'checked_present' is false, or if it has been
   // checked and found not to be alive in any RowSet.
diff --git a/src/kudu/tablet/tablet-test-base.h 
b/src/kudu/tablet/tablet-test-base.h
index 4a86a2b..763b694 100644
--- a/src/kudu/tablet/tablet-test-base.h
+++ b/src/kudu/tablet/tablet-test-base.h
@@ -319,15 +319,23 @@ class TabletTestBase : public KuduTabletTest {
   void InsertTestRows(int64_t first_row,
                       int64_t count,
                       int32_t val,
-                      TimeSeries *ts = NULL) {
+                      TimeSeries *ts = nullptr) {
     InsertOrUpsertTestRows(RowOperationsPB::INSERT, first_row, count, val, ts);
   }
 
+  // Insert "count" rows, ignoring duplicate key errors.
+  void InsertIgnoreTestRows(int64_t first_row,
+                            int64_t count,
+                            int32_t val,
+                            TimeSeries *ts = nullptr) {
+    InsertOrUpsertTestRows(RowOperationsPB::INSERT_IGNORE, first_row, count, 
val, ts);
+  }
+
   // Upserts "count" rows.
   void UpsertTestRows(int64_t first_row,
                       int64_t count,
                       int32_t val,
-                      TimeSeries *ts = NULL) {
+                      TimeSeries *ts = nullptr) {
     InsertOrUpsertTestRows(RowOperationsPB::UPSERT, first_row, count, val, ts);
   }
 
@@ -335,7 +343,7 @@ class TabletTestBase : public KuduTabletTest {
                               int64_t first_row,
                               int64_t count,
                               int32_t val,
-                              TimeSeries *ts = NULL) {
+                              TimeSeries *ts = nullptr) {
     LocalTabletWriter writer(tablet().get(), &client_schema_);
     KuduPartialRow row(&client_schema_);
 
@@ -344,6 +352,8 @@ class TabletTestBase : public KuduTabletTest {
       setup_.BuildRow(&row, i, val);
       if (type == RowOperationsPB::INSERT) {
         CHECK_OK(writer.Insert(row));
+      } else if (type == RowOperationsPB::INSERT_IGNORE) {
+        CHECK_OK(writer.InsertIgnore(row));
       } else if (type == RowOperationsPB::UPSERT) {
         CHECK_OK(writer.Upsert(row));
       } else {
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 6877358..3f83998 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -24,6 +24,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -748,6 +749,53 @@ TYPED_TEST(TestTablet, TestInsertsPersist) {
   // TODO(unknown): add some more data, re-flush
 }
 
+TYPED_TEST(TestTablet, TestInsertIgnore) {
+  LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
+  KuduPartialRow row(&this->client_schema_);
+  vector<string> rows;
+
+  // Single batch, insert then insert ingore of same row, operation should 
succeed
+  this->setup_.BuildRow(&row, 0, 1000);
+  vector<LocalTabletWriter::Op> ops;
+  ops.emplace_back(LocalTabletWriter::Op(RowOperationsPB::INSERT, &row));
+  ops.emplace_back(LocalTabletWriter::Op(RowOperationsPB::INSERT_IGNORE, 
&row));
+  ASSERT_OK(writer.WriteBatch(ops));
+  ASSERT_OK(this->IterateToStringList(&rows));
+  ASSERT_EQ(1, rows.size());
+  EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1000, false) }, 
rows);
+
+  ASSERT_OK(this->DeleteTestRow(&writer, 0));
+
+  ops.clear();
+  this->setup_.BuildRow(&row, 0, 1001);
+  ops.emplace_back(LocalTabletWriter::Op(RowOperationsPB::INSERT_IGNORE, 
&row));
+  ops.emplace_back(LocalTabletWriter::Op(RowOperationsPB::INSERT, &row));
+  Status s = writer.WriteBatch(ops);
+  ASSERT_STR_CONTAINS(s.ToString(), "key already present");
+  ASSERT_OK(this->IterateToStringList(&rows));
+  ASSERT_EQ(1, rows.size());
+  EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1001, false) }, 
rows);
+
+  // INSERT IGNORE a row that is in MRS, ensure value doesn't change
+  this->InsertIgnoreTestRows(0, 1, 2000);
+  ASSERT_OK(this->IterateToStringList(&rows));
+  ASSERT_EQ(1, rows.size());
+  EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1001, false) }, 
rows);
+
+  this->UpsertTestRows(0, 1, 1011);
+
+  // Flush it.
+  ASSERT_OK(this->tablet()->Flush());
+  ASSERT_OK(this->IterateToStringList(&rows));
+  EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1011, false) }, 
rows);
+
+  // INSERT IGNORE a row that is in DRS, ensure value doesn't change.
+  this->InsertIgnoreTestRows(0, 1, 1000);
+  ASSERT_OK(this->IterateToStringList(&rows));
+  ASSERT_EQ(1, rows.size());
+  EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1011, false) }, 
rows);
+}
+
 TYPED_TEST(TestTablet, TestUpsert) {
   vector<string> rows;
   const auto& upserts_as_updates = 
this->tablet()->metrics()->upserts_as_updates;
@@ -776,7 +824,6 @@ TYPED_TEST(TestTablet, TestUpsert) {
   NO_FATALS(this->CheckLiveRowsCount(1));
 }
 
-
 // Test that when a row has been updated many times, it always yields
 // the most recent value.
 TYPED_TEST(TestTablet, TestMultipleUpdates) {
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index dd85ba2..d2a158c 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -557,6 +557,7 @@ bool Tablet::ValidateOpOrMarkFailed(RowOp* op) {
 Status Tablet::ValidateOp(const RowOp& op) {
   switch (op.decoded_op.type) {
     case RowOperationsPB::INSERT:
+    case RowOperationsPB::INSERT_IGNORE:
     case RowOperationsPB::UPSERT:
       return ValidateInsertOrUpsertUnlocked(op);
 
@@ -608,19 +609,27 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* 
io_context,
   DCHECK(op->checked_present);
   DCHECK(op->valid);
 
-  const bool is_upsert = op->decoded_op.type == RowOperationsPB::UPSERT;
+  RowOperationsPB_Type op_type = op->decoded_op.type;
   const TabletComponents* comps = 
DCHECK_NOTNULL(tx_state->tablet_components());
 
   if (op->present_in_rowset) {
-    if (is_upsert) {
-      return ApplyUpsertAsUpdate(io_context, tx_state, op, 
op->present_in_rowset, stats);
-    }
-    Status s = Status::AlreadyPresent("key already present");
-    if (metrics_) {
-      metrics_->insertions_failed_dup_key->Increment();
+    switch (op_type) {
+      case RowOperationsPB::UPSERT:
+        return ApplyUpsertAsUpdate(io_context, tx_state, op, 
op->present_in_rowset, stats);
+      case RowOperationsPB::INSERT_IGNORE:
+        op->SetErrorIgnored();
+        return Status::OK();
+      case RowOperationsPB::INSERT: {
+        Status s = Status::AlreadyPresent("key already present");
+        if (metrics_) {
+          metrics_->insertions_failed_dup_key->Increment();
+        }
+        op->SetFailed(s);
+        return s;
+      }
+      default:
+        LOG(FATAL) << "Unknown operation type: " << op_type;
     }
-    op->SetFailed(s);
-    return s;
   }
 
   Timestamp ts = tx_state->timestamp();
@@ -636,11 +645,19 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* 
io_context,
     op->SetInsertSucceeded(comps->memrowset->mrs_id());
   } else {
     if (s.IsAlreadyPresent()) {
-      if (is_upsert) {
-        return ApplyUpsertAsUpdate(io_context, tx_state, op, 
comps->memrowset.get(), stats);
-      }
-      if (metrics_) {
-        metrics_->insertions_failed_dup_key->Increment();
+      switch (op_type) {
+        case RowOperationsPB::UPSERT:
+          return ApplyUpsertAsUpdate(io_context, tx_state, op, 
comps->memrowset.get(), stats);
+        case RowOperationsPB::INSERT_IGNORE:
+          op->SetErrorIgnored();
+          return Status::OK();
+        case RowOperationsPB::INSERT:
+          if (metrics_) {
+            metrics_->insertions_failed_dup_key->Increment();
+          }
+          break;
+        default:
+          LOG(FATAL) << "Unknown operation type: " << op_type;
       }
     }
     op->SetFailed(s);
@@ -986,6 +1003,7 @@ Status Tablet::ApplyRowOperation(const IOContext* 
io_context,
   Status s;
   switch (row_op->decoded_op.type) {
     case RowOperationsPB::INSERT:
+    case RowOperationsPB::INSERT_IGNORE:
     case RowOperationsPB::UPSERT:
       s = InsertOrUpsertUnlocked(io_context, tx_state, row_op, stats);
       if (s.IsAlreadyPresent()) {
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index c419e01..9c6d810 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -545,8 +545,8 @@ class Tablet {
   // Validate the given update/delete operation.
   static Status ValidateMutateUnlocked(const RowOp& op);
 
-  // Perform an INSERT or UPSERT operation, assuming that the transaction is 
already in
-  // prepared state. This state ensures that:
+  // Perform an INSERT, INSERT_IGNORE, or UPSERT operation, assuming that the 
transaction is
+  // already in a prepared state. This state ensures that:
   // - the row lock is acquired
   // - the tablet components have been acquired
   // - the operation has been decoded
diff --git a/src/kudu/tablet/tablet_bootstrap.cc 
b/src/kudu/tablet/tablet_bootstrap.cc
index 1f3e8b3..992601d 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -421,7 +421,8 @@ class TabletBootstrap {
     // Number of REPLICATE messages for which a matching COMMIT was found.
     int ops_committed;
 
-    // Number inserts/mutations seen and ignored.
+    // Number inserts/mutations seen and ignored. Note inserts_ignored does 
not refer
+    // to the INSERT_IGNORE operation. It refers to inserts ignored during log 
replay.
     int inserts_seen, inserts_ignored;
     int mutations_seen, mutations_ignored;
 
@@ -1553,8 +1554,9 @@ Status TabletBootstrap::ApplyOperations(const IOContext* 
io_context,
     // Increment the seen/ignored stats.
     switch (op->decoded_op.type) {
       case RowOperationsPB::INSERT:
+      case RowOperationsPB::INSERT_IGNORE:
       case RowOperationsPB::UPSERT: {
-        // TODO: should we have a separate counter for upserts?
+        // TODO(unknown): should we have a separate counter for upserts?
         stats_.inserts_seen++;
         if (op->has_result()) {
           stats_.inserts_ignored++;
diff --git a/src/kudu/tablet/tablet_metrics.cc 
b/src/kudu/tablet/tablet_metrics.cc
index c3d05fe..a135f8d 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -31,6 +31,11 @@ METRIC_DEFINE_counter(tablet, rows_inserted, "Rows Inserted",
     kudu::MetricUnit::kRows,
     "Number of rows inserted into this tablet since service start",
     kudu::MetricLevel::kInfo);
+METRIC_DEFINE_counter(tablet, insert_ignore_errors, "Insert Ignore Errors",
+    kudu::MetricUnit::kRows,
+    "Number of insert ignore operations for this tablet which were "
+    "ignored due to an error since service start",
+    kudu::MetricLevel::kDebug);
 METRIC_DEFINE_counter(tablet, rows_upserted, "Rows Upserted",
     kudu::MetricUnit::kRows,
     "Number of rows upserted into this tablet since service start",
@@ -326,6 +331,7 @@ TabletMetrics::TabletMetrics(const 
scoped_refptr<MetricEntity>& entity)
     MINIT(rows_upserted),
     MINIT(rows_updated),
     MINIT(rows_deleted),
+    MINIT(insert_ignore_errors),
     MINIT(insertions_failed_dup_key),
     MINIT(upserts_as_updates),
     MINIT(scanner_rows_returned),
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 1190b04..7dab1ea 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -48,6 +48,7 @@ struct TabletMetrics {
   scoped_refptr<Counter> rows_upserted;
   scoped_refptr<Counter> rows_updated;
   scoped_refptr<Counter> rows_deleted;
+  scoped_refptr<Counter> insert_ignore_errors;
   scoped_refptr<Counter> insertions_failed_dup_key;
   scoped_refptr<Counter> upserts_as_updates;
 
diff --git a/src/kudu/tablet/tablet_random_access-test.cc 
b/src/kudu/tablet/tablet_random_access-test.cc
index 12455e8..3452742 100644
--- a/src/kudu/tablet/tablet_random_access-test.cc
+++ b/src/kudu/tablet/tablet_random_access-test.cc
@@ -26,7 +26,6 @@
 #include <boost/optional/optional.hpp>
 #include <boost/optional/optional_io.hpp>
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -119,23 +118,34 @@ class TestRandomAccess : public KuduTabletTest {
     vector<LocalTabletWriter::Op> pending;
     for (int i = 0; i < 3; i++) {
       int new_val = rand();
+      int r = rand() % 3;
       if (cur_val == boost::none) {
         // If there is no row, then randomly insert or upsert.
-        if (rand() % 2 == 1) {
-          cur_val = InsertRow(key, new_val, &pending);
-        } else {
-          cur_val = UpsertRow(key, new_val, cur_val, &pending);
+        switch (r) {
+          case 1:
+            cur_val = InsertRow(key, new_val, &pending);
+            break;
+          case 2:
+            cur_val = InsertIgnoreRow(key, new_val, &pending);
+            break;
+          default:
+            cur_val = UpsertRow(key, new_val, cur_val, &pending);
         }
       } else {
         if (new_val % (FLAGS_update_delete_ratio + 1) == 0) {
           cur_val = DeleteRow(key, &pending);
         } else {
-          // If we are meant to update an existing row, randomly choose
-          // between update and upsert.
-          if (rand() % 2 == 1) {
-            cur_val = MutateRow(key, new_val, cur_val, &pending);
-          } else {
-            cur_val = UpsertRow(key, new_val, cur_val, &pending);
+          // If row already exists, randomly choose between an update,
+          // upsert, and insert ignore.
+          switch (r) {
+            case 1:
+              cur_val = MutateRow(key, new_val, cur_val, &pending);
+              break;
+            case 2:
+              InsertIgnoreRow(key, new_val, &pending); // won't change 
existing value
+              break;
+            default:
+              cur_val = UpsertRow(key, new_val, cur_val, &pending);
           }
         }
       }
@@ -186,12 +196,16 @@ class TestRandomAccess : public KuduTabletTest {
     }
   }
 
-  // Adds an insert for the given key/value pair to 'ops', returning the new 
stringified
-  // value of the row.
+  // Adds an insert for the given key/value pair to 'ops', returning the 
expected value
   optional<ExpectedKeyValueRow> InsertRow(int key, int val, 
vector<LocalTabletWriter::Op>* ops) {
     return DoRowOp(RowOperationsPB::INSERT, key, val, boost::none, ops);
   }
 
+  optional<ExpectedKeyValueRow> InsertIgnoreRow(int key, int val,
+                                                vector<LocalTabletWriter::Op>* 
ops) {
+    return DoRowOp(RowOperationsPB::INSERT_IGNORE, key, val, boost::none, ops);
+  }
+
   optional<ExpectedKeyValueRow> UpsertRow(int key,
                                           int val,
                                           const optional<ExpectedKeyValueRow>& 
old_row,
@@ -199,8 +213,7 @@ class TestRandomAccess : public KuduTabletTest {
     return DoRowOp(RowOperationsPB::UPSERT, key, val, old_row, ops);
   }
 
-  // Adds an update of the given key/value pair to 'ops', returning the new 
stringified
-  // value of the row.
+  // Adds an update of the given key/value pair to 'ops', returning the 
expected value
   optional<ExpectedKeyValueRow> MutateRow(int key,
                                           uint32_t new_val,
                                           const optional<ExpectedKeyValueRow>& 
old_row,
@@ -223,6 +236,7 @@ class TestRandomAccess : public KuduTabletTest {
       case RowOperationsPB::UPSERT:
       case RowOperationsPB::UPDATE:
       case RowOperationsPB::INSERT:
+      case RowOperationsPB::INSERT_IGNORE:
         switch (val % 2) {
           case 0:
             CHECK_OK(row->SetNull(1));
diff --git a/src/kudu/tablet/transactions/transaction.cc 
b/src/kudu/tablet/transactions/transaction.cc
index 9f17ddc..da6e967 100644
--- a/src/kudu/tablet/transactions/transaction.cc
+++ b/src/kudu/tablet/transactions/transaction.cc
@@ -72,6 +72,7 @@ 
TransactionCompletionCallback::~TransactionCompletionCallback() {}
 
 TransactionMetrics::TransactionMetrics()
   : successful_inserts(0),
+    insert_ignore_errors(0),
     successful_upserts(0),
     successful_updates(0),
     successful_deletes(0),
@@ -80,6 +81,7 @@ TransactionMetrics::TransactionMetrics()
 
 void TransactionMetrics::Reset() {
   successful_inserts = 0;
+  insert_ignore_errors = 0;
   successful_upserts = 0;
   successful_updates = 0;
   successful_deletes = 0;
diff --git a/src/kudu/tablet/transactions/transaction.h 
b/src/kudu/tablet/transactions/transaction.h
index 5671d98..01d3122 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -60,6 +60,7 @@ struct TransactionMetrics {
   TransactionMetrics();
   void Reset();
   int successful_inserts;
+  int insert_ignore_errors;
   int successful_upserts;
   int successful_updates;
   int successful_deletes;
diff --git a/src/kudu/tablet/transactions/write_transaction.cc 
b/src/kudu/tablet/transactions/write_transaction.cc
index c25d3a7..2bfbbde 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -155,7 +155,7 @@ Status WriteTransaction::Prepare() {
   RETURN_NOT_OK_PREPEND(SchemaFromPB(state_->request()->schema(), 
&client_schema),
                         "Cannot decode client schema");
   if (client_schema.has_column_ids()) {
-    // TODO: we have this kind of code a lot - add a new SchemaFromPB variant 
which
+    // TODO(unknown): we have this kind of code a lot - add a new SchemaFromPB 
variant which
     // does this check inline.
     Status s = Status::InvalidArgument("User requests should not have Column 
IDs");
     state_->completion_callback()->set_error(s, 
TabletServerErrorPB::INVALID_SCHEMA);
@@ -262,9 +262,10 @@ void WriteTransaction::Finish(TransactionResult result) {
 
   TabletMetrics* metrics = state_->tablet_replica()->tablet()->metrics();
   if (metrics) {
-    // TODO: should we change this so it's actually incremented by the
+    // TODO(unknown): should we change this so it's actually incremented by the
     // Tablet code itself instead of this wrapper code?
     metrics->rows_inserted->IncrementBy(state_->metrics().successful_inserts);
+    
metrics->insert_ignore_errors->IncrementBy(state_->metrics().insert_ignore_errors);
     metrics->rows_upserted->IncrementBy(state_->metrics().successful_upserts);
     metrics->rows_updated->IncrementBy(state_->metrics().successful_updates);
     metrics->rows_deleted->IncrementBy(state_->metrics().successful_deletes);
@@ -417,6 +418,13 @@ void WriteTransactionState::UpdateMetricsForOp(const 
RowOp& op) {
     case RowOperationsPB::INSERT:
       tx_metrics_.successful_inserts++;
       break;
+    case RowOperationsPB::INSERT_IGNORE:
+      if (op.error_ignored) {
+        tx_metrics_.insert_ignore_errors++;
+      } else {
+        tx_metrics_.successful_inserts++;
+      }
+      break;
     case RowOperationsPB::UPSERT:
       tx_metrics_.successful_upserts++;
       break;

Reply via email to