KUDU-1623. Properly handle UPSERTS that only include PK column

This fixes a bug reported by Chris George in which the tablet server
would crash when handling an UPSERT which was converted into an empty
UPDATE. For example, in pseudo-SQL:

  CREATE TABLE t (
    x INT NOT NULL PRIMARY KEY
  );

  UPSERT INTO t VALUES (1);
  UPSERT INTO t VALUES (1);

Here the second upsert would detect the primary key already existed and
convert into an update containing any non-PK columns. Since there are no
non-PK columns, the update was "empty", and ended up as an empty
RowChangeList. In DEBUG builds, this fired a DCHECK, but in RELEASE
builds, we ended up with an empty RowChangeList in the DMS. At flush
time, this would cause other assertions to fire or crash.

The fix is relatively simple: if we see that an upsert converted into an
empty update, we just drop the delta rather than continuing to insert
into the delta store. This required loosing a check at bootstrap time,
where we previously assumed that every mutation must have been recorded
in at least one store.

The test changes are much larger than the fix itself due to some
refactoring. The main gist of the change is to introduce operations in
tablet_random_access-test and fuzz-itest where we send an UPSERT which
contains only the primary key column. These tests crashed prior to the
fix and pass now.

Change-Id: Ic878f6e51ead5bf91335ddb47536e7c29de11ac4
Reviewed-on: http://gerrit.cloudera.org:8080/4441
Reviewed-by: David Ribeiro Alves <dral...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9911c489
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9911c489
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9911c489

Branch: refs/heads/master
Commit: 9911c489c45b3a261ee50ad1f837387b4953421b
Parents: 4ba39e4
Author: Todd Lipcon <t...@apache.org>
Authored: Fri Sep 16 11:14:07 2016 -0700
Committer: Todd Lipcon <t...@apache.org>
Committed: Mon Sep 19 21:10:54 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/fuzz-itest.cc     |  93 ++++++++++-----
 src/kudu/tablet/key_value_test_schema.h      |  60 ++++++++++
 src/kudu/tablet/tablet.cc                    |  11 +-
 src/kudu/tablet/tablet_bootstrap.cc          |   7 +-
 src/kudu/tablet/tablet_random_access-test.cc | 131 +++++++++++++++-------
 5 files changed, 229 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9911c489/src/kudu/integration-tests/fuzz-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/fuzz-itest.cc 
b/src/kudu/integration-tests/fuzz-itest.cc
index 5629e14..2134a56 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <boost/optional.hpp>
+#include <boost/optional/optional_io.hpp>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -31,6 +33,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/tablet/key_value_test_schema.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -43,6 +46,7 @@ DEFINE_int32(keyspace_size, 2,  "number of distinct primary 
keys to test with");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(use_hybrid_clock);
 
+using boost::optional;
 using std::string;
 using std::vector;
 using std::unique_ptr;
@@ -52,6 +56,7 @@ using std::unique_ptr;
 enum TestOpType {
   TEST_INSERT,
   TEST_UPSERT,
+  TEST_UPSERT_PK_ONLY,
   TEST_UPDATE,
   TEST_DELETE,
   TEST_FLUSH_OPS,
@@ -93,6 +98,7 @@ namespace tablet {
 const char* TestOpType_names[] = {
   "TEST_INSERT",
   "TEST_UPSERT",
+  "TEST_UPSERT_PK_ONLY",
   "TEST_UPDATE",
   "TEST_DELETE",
   "TEST_FLUSH_OPS",
@@ -129,10 +135,7 @@ class FuzzTest : public KuduTest {
     FLAGS_enable_maintenance_manager = false;
     FLAGS_use_hybrid_clock = false;
 
-    KuduSchemaBuilder b;
-    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
-    b.AddColumn("val")->Type(KuduColumnSchema::INT32);
-    CHECK_OK(b.Build(&schema_));
+    schema_ = client::KuduSchemaFromSchema(CreateKeyValueTestSchema());
   }
 
   void SetUp() override {
@@ -192,9 +195,12 @@ class FuzzTest : public KuduTest {
     return tablet_peer_->tablet();
   }
 
-  // Adds an insert for the given key/value pair to 'ops', returning the new 
stringified
-  // value of the row.
-  string InsertOrUpsertRow(int key, int val, TestOpType type) {
+  // Adds an insert for the given key/value pair to 'ops', returning the new 
contents
+  // of the row.
+  ExpectedKeyValueRow InsertOrUpsertRow(int key, int val,
+                                        optional<ExpectedKeyValueRow> old_row,
+                                        TestOpType type) {
+    ExpectedKeyValueRow ret;
     unique_ptr<KuduWriteOperation> op;
     if (type == TEST_INSERT) {
       op.reset(table_->NewInsert());
@@ -203,45 +209,54 @@ class FuzzTest : public KuduTest {
     }
     KuduPartialRow* row = op->mutable_row();
     CHECK_OK(row->SetInt32(0, key));
-    if (val & 1) {
-      CHECK_OK(row->SetNull(1));
+    ret.key = key;
+    if (type != TEST_UPSERT_PK_ONLY) {
+      if (val & 1) {
+        CHECK_OK(row->SetNull(1));
+      } else {
+        CHECK_OK(row->SetInt32(1, val));
+        ret.val = val;
+      }
     } else {
-      CHECK_OK(row->SetInt32(1, val));
+      // For "upsert PK only", we expect the row to keep its old value
+      // the row existed, or NULL if there was no old row.
+      ret.val = old_row ? old_row->val : boost::none;
     }
-    string ret = row->ToString();
     CHECK_OK(session_->Apply(op.release()));
     return ret;
   }
 
-  // Adds an update of the given key/value pair to 'ops', returning the new 
stringified
-  // value of the row.
-  string MutateRow(int key, uint32_t new_val) {
+  // Adds an update of the given key/value pair to 'ops', returning the new 
contents
+  // of the row.
+  ExpectedKeyValueRow MutateRow(int key, uint32_t new_val) {
+    ExpectedKeyValueRow ret;
     unique_ptr<KuduUpdate> update(table_->NewUpdate());
     KuduPartialRow* row = update->mutable_row();
     CHECK_OK(row->SetInt32(0, key));
+    ret.key = key;
     if (new_val & 1) {
       CHECK_OK(row->SetNull(1));
     } else {
       CHECK_OK(row->SetInt32(1, new_val));
+      ret.val = new_val;
     }
-    string ret = row->ToString();
     CHECK_OK(session_->Apply(update.release()));
     return ret;
   }
 
-  // Adds a delete of the given row to 'ops', returning an empty string 
(indicating that
+  // Adds a delete of the given row to 'ops', returning boost::none 
(indicating that
   // the row no longer exists).
-  string DeleteRow(int key) {
+  optional<ExpectedKeyValueRow> DeleteRow(int key) {
     unique_ptr<KuduDelete> del(table_->NewDelete());
     KuduPartialRow* row = del->mutable_row();
     CHECK_OK(row->SetInt32(0, key));
     CHECK_OK(session_->Apply(del.release()));
-    return "";
+    return boost::none;
   }
 
   // Random-read the given row, returning its current value.
-  // If the row doesn't exist, returns "()".
-  string GetRow(int key) {
+  // If the row doesn't exist, returns boost::none.
+  optional<ExpectedKeyValueRow> GetRow(int key) {
     KuduScanner s(table_.get());
     CHECK_OK(s.AddConjunctPredicate(table_->NewComparisonPredicate(
         "key", KuduPredicate::EQUAL, KuduValue::FromInt(key))));
@@ -250,10 +265,16 @@ class FuzzTest : public KuduTest {
       KuduScanBatch batch;
       CHECK_OK(s.NextBatch(&batch));
       for (KuduScanBatch::RowPtr row : batch) {
-        return row.ToString();
+        ExpectedKeyValueRow ret;
+        CHECK_OK(row.GetInt32(0, &ret.key));
+        if (!row.IsNull(1)) {
+          ret.val = 0;
+          CHECK_OK(row.GetInt32(1, ret.val.get_ptr()));
+        }
+        return ret;
       }
     }
-    return "()";
+    return boost::none;
   }
 
  protected:
@@ -290,7 +311,8 @@ void GenerateTestCase(vector<TestOp>* ops, int len) {
         data_in_mrs = true;
         break;
       case TEST_UPSERT:
-        ops->push_back({TEST_UPSERT, row_key});
+      case TEST_UPSERT_PK_ONLY:
+        ops->push_back({r, row_key});
         exists[row_key] = true;
         ops_pending = true;
         // If the row doesn't currently exist, this will act like an insert
@@ -387,20 +409,23 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
   // into a test method in order to reproduce a failure.
   LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);
 
-  vector<string> cur_val(FLAGS_keyspace_size);
-  vector<string> pending_val(FLAGS_keyspace_size);
+  vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
+  vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
-    string val_in_table = GetRow(test_op.row_key);
-    EXPECT_EQ("(" + cur_val[test_op.row_key] + ")", val_in_table);
+    optional<ExpectedKeyValueRow> val_in_table = GetRow(test_op.row_key);
+    EXPECT_EQ(cur_val[test_op.row_key], val_in_table);
 
     LOG(INFO) << test_op.ToString();
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_UPSERT:
-        pending_val[test_op.row_key] = InsertOrUpsertRow(test_op.row_key, i++, 
test_op.type);
+      case TEST_UPSERT_PK_ONLY: {
+        pending_val[test_op.row_key] = InsertOrUpsertRow(
+            test_op.row_key, i++, pending_val[test_op.row_key], test_op.type);
         break;
+      }
       case TEST_UPDATE:
         for (int j = 0; j < update_multiplier; j++) {
           pending_val[test_op.row_key] = MutateRow(test_op.row_key, i++);
@@ -669,5 +694,17 @@ TEST_F(FuzzTest, TestUpsertSeq) {
     });
 }
 
+// Regression test for KUDU-1623: updates without primary key
+// columns specified can cause crashes and issues at restart.
+TEST_F(FuzzTest, TestUpsert_PKOnly) {
+  RunFuzzCase({
+      {TEST_INSERT, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_UPSERT_PK_ONLY, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_RESTART_TS, 0}
+    });
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9911c489/src/kudu/tablet/key_value_test_schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/key_value_test_schema.h 
b/src/kudu/tablet/key_value_test_schema.h
new file mode 100644
index 0000000..e81a4f2
--- /dev/null
+++ b/src/kudu/tablet/key_value_test_schema.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file contains a schema and some utility code for a simple
+// "Key-Value" table, with int32 keys and nullable int32 values. This
+// is used by a few tests.
+#pragma once
+
+#include <boost/optional.hpp>
+#include <boost/optional/optional_io.hpp>
+#include <iostream>
+
+#include "kudu/common/schema.h"
+
+namespace kudu {
+
+// Struct for keeping track of what we think the contents of a row should be 
after
+// our random mutations.
+struct ExpectedKeyValueRow {
+  // Non-nullable key.
+  int32_t key;
+
+  // Nullable value.
+  boost::optional<int32_t> val;
+
+  bool operator==(const ExpectedKeyValueRow& other) const {
+    return key == other.key && val == other.val;
+  }
+};
+
+inline Schema CreateKeyValueTestSchema() {
+  return Schema({ColumnSchema("key", INT32),
+                 ColumnSchema("val", INT32, true) }, 1);
+}
+
+inline std::ostream& operator<<(std::ostream& o, const ExpectedKeyValueRow& t) 
{
+  o << "{" << t.key << ", ";
+  if (t.val == boost::none) {
+    o << "NULL";
+  } else {
+    o << *t.val;
+  }
+  return o << "}";
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9911c489/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 508aef7..8b7e0e4 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -457,9 +457,18 @@ Status Tablet::ApplyUpsertAsUpdate(WriteTransactionState* 
tx_state,
     enc.AddColumnUpdate(c, schema->column_id(i), val);
   }
 
+  // If the UPSERT just included the primary key columns, and the rest
+  // were unset (eg because the table only _has_ primary keys, or because
+  // the rest are intended to be set to their defaults), we need to
+  // avoid doing anything.
+  gscoped_ptr<OperationResultPB> result(new OperationResultPB());
+  if (enc.is_empty()) {
+    upsert->SetMutateSucceeded(std::move(result));
+    return Status::OK();
+  }
+
   RowChangeList rcl = enc.as_changelist();
 
-  gscoped_ptr<OperationResultPB> result(new OperationResultPB());
   Status s = rowset->MutateRow(tx_state->timestamp(),
                                *upsert->key_probe,
                                rcl,

http://git-wip-us.apache.org/repos/asf/kudu/blob/9911c489/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc 
b/src/kudu/tablet/tablet_bootstrap.cc
index 845f4af..8650e81 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1448,10 +1448,13 @@ Status TabletBootstrap::FilterOperation(const 
OperationResultPB& op_result,
   }
 
   int num_mutated_stores = op_result.mutated_stores_size();
-  if (PREDICT_FALSE(num_mutated_stores == 0 || num_mutated_stores > 2)) {
-    return Status::Corruption(Substitute("All operations must have one or two 
mutated_stores: $0",
+  if (PREDICT_FALSE(num_mutated_stores > 2)) {
+    return Status::Corruption(Substitute("All operations must have at most two 
mutated_stores: $0",
                                          op_result.ShortDebugString()));
   }
+  // NOTE: it's possible that num_mutated_stores = 0 in the case of an
+  // UPSERT which only specified the primary key. In that case, if the
+  // row already existed, it gets dropped without converting into an UPDATE.
 
   // The mutation may have been duplicated, so we'll check whether any of the
   // output targets was active.

http://git-wip-us.apache.org/repos/asf/kudu/blob/9911c489/src/kudu/tablet/tablet_random_access-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_random_access-test.cc 
b/src/kudu/tablet/tablet_random_access-test.cc
index ae9822c..8d9304f 100644
--- a/src/kudu/tablet/tablet_random_access-test.cc
+++ b/src/kudu/tablet/tablet_random_access-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <boost/optional.hpp>
+#include <boost/optional/optional_io.hpp>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -24,6 +26,7 @@
 
 #include "kudu/common/schema.h"
 #include "kudu/gutil/casts.h"
+#include "kudu/tablet/key_value_test_schema.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet-test-base.h"
 #include "kudu/util/stopwatch.h"
@@ -36,6 +39,7 @@ DEFINE_int32(update_delete_ratio, 4, "ratio of update:delete 
when mutating exist
 
 DECLARE_int32(deltafile_default_block_size);
 
+using boost::optional;
 using std::string;
 using std::vector;
 
@@ -51,9 +55,8 @@ namespace tablet {
 class TestRandomAccess : public KuduTabletTest {
  public:
   TestRandomAccess()
-    : KuduTabletTest(Schema({ ColumnSchema("key", INT32),
-                              ColumnSchema("val", INT32, true) }, 1)),
-      done_(1) {
+      : KuduTabletTest(CreateKeyValueTestSchema()),
+        done_(1) {
     OverrideFlagForSlowTests("keyspace_size", "30000");
     OverrideFlagForSlowTests("runtime_seconds", "10");
     OverrideFlagForSlowTests("sleep_between_background_ops_ms", "1000");
@@ -67,6 +70,7 @@ class TestRandomAccess : public KuduTabletTest {
   virtual void SetUp() OVERRIDE {
     KuduTabletTest::SetUp();
     writer_.reset(new LocalTabletWriter(tablet().get(), &client_schema_));
+    SeedRandom();
   }
 
   // Pick a random row of the table, verify its current state, and then
@@ -81,21 +85,21 @@ class TestRandomAccess : public KuduTabletTest {
   // and validates the correct errors.
   void DoRandomBatch() {
     int key = rand() % expected_tablet_state_.size();
-    string& cur_val = expected_tablet_state_[key];
+    optional<ExpectedKeyValueRow>& cur_val = expected_tablet_state_[key];
 
     // Check that a read yields what we expect.
-    string val_in_table = GetRow(key);
-    ASSERT_EQ("(" + cur_val + ")", val_in_table);
+    optional<ExpectedKeyValueRow> val_in_table = GetRow(key);
+    ASSERT_EQ(cur_val, val_in_table);
 
     vector<LocalTabletWriter::Op> pending;
     for (int i = 0; i < 3; i++) {
       int new_val = rand();
-      if (cur_val.empty()) {
+      if (cur_val == boost::none) {
         // If there is no row, then randomly insert or upsert.
         if (rand() % 2 == 1) {
           cur_val = InsertRow(key, new_val, &pending);
         } else {
-          cur_val = UpsertRow(key, new_val, &pending);
+          cur_val = UpsertRow(key, new_val, cur_val, &pending);
         }
       } else {
         if (new_val % (FLAGS_update_delete_ratio + 1) == 0) {
@@ -104,13 +108,19 @@ class TestRandomAccess : public KuduTabletTest {
           // If we are meant to update an existing row, randomly choose
           // between update and upsert.
           if (rand() % 2 == 1) {
-            cur_val = MutateRow(key, new_val, &pending);
+            cur_val = MutateRow(key, new_val, cur_val, &pending);
           } else {
-            cur_val = UpsertRow(key, new_val, &pending);
+            cur_val = UpsertRow(key, new_val, cur_val, &pending);
           }
         }
       }
     }
+
+    VLOG(1) << "Performing batch:";
+    for (const auto& op : pending) {
+      VLOG(1) << RowOperationsPB::Type_Name(op.type) << " " << 
op.row->ToString();
+    }
+
     CHECK_OK(writer_->WriteBatch(pending));
     for (LocalTabletWriter::Op op : pending) {
       delete op.row;
@@ -153,55 +163,88 @@ class TestRandomAccess : public KuduTabletTest {
 
   // Adds an insert for the given key/value pair to 'ops', returning the new 
stringified
   // value of the row.
-  string InsertRow(int key, int val, vector<LocalTabletWriter::Op>* ops) {
-    return InsertOrUpsertRow(RowOperationsPB::INSERT, key, val, ops);
+  optional<ExpectedKeyValueRow> InsertRow(int key, int val, 
vector<LocalTabletWriter::Op>* ops) {
+    return DoRowOp(RowOperationsPB::INSERT, key, val, boost::none, ops);
   }
 
-  string UpsertRow(int key, int val, vector<LocalTabletWriter::Op>* ops) {
-    return InsertOrUpsertRow(RowOperationsPB::UPSERT, key, val, ops);
-  }
-
-  string InsertOrUpsertRow(RowOperationsPB::Type type, int key, int val,
-                           vector<LocalTabletWriter::Op>* ops) {
-    gscoped_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
-    CHECK_OK(row->SetInt32(0, key));
-    if (val & 1) {
-      CHECK_OK(row->SetNull(1));
-    } else {
-      CHECK_OK(row->SetInt32(1, val));
-    }
-    string ret = row->ToString();
-    ops->push_back(LocalTabletWriter::Op(type, row.release()));
-    return ret;
+  optional<ExpectedKeyValueRow> UpsertRow(int key,
+                                          int val,
+                                          const optional<ExpectedKeyValueRow>& 
old_row,
+                                          vector<LocalTabletWriter::Op>* ops) {
+    return DoRowOp(RowOperationsPB::UPSERT, key, val, old_row, ops);
   }
 
   // Adds an update of the given key/value pair to 'ops', returning the new 
stringified
   // value of the row.
-  string MutateRow(int key, uint32_t new_val, vector<LocalTabletWriter::Op>* 
ops) {
+  optional<ExpectedKeyValueRow> MutateRow(int key,
+                                          uint32_t new_val,
+                                          const optional<ExpectedKeyValueRow>& 
old_row,
+                                          vector<LocalTabletWriter::Op>* ops) {
+    return DoRowOp(RowOperationsPB::UPDATE, key, new_val, old_row, ops);
+  }
+
+  optional<ExpectedKeyValueRow> DoRowOp(RowOperationsPB::Type type,
+                                        int key,
+                                        int val,
+                                        const optional<ExpectedKeyValueRow>& 
old_row,
+                                        vector<LocalTabletWriter::Op>* ops) {
+
     gscoped_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
     CHECK_OK(row->SetInt32(0, key));
-    if (new_val & 1) {
-      CHECK_OK(row->SetNull(1));
-    } else {
-      CHECK_OK(row->SetInt32(1, new_val));
+    optional<ExpectedKeyValueRow> ret = ExpectedKeyValueRow();
+    ret->key = key;
+
+    switch (type) {
+      case RowOperationsPB::UPSERT:
+      case RowOperationsPB::UPDATE:
+      case RowOperationsPB::INSERT:
+        switch (val % 2) {
+          case 0:
+            CHECK_OK(row->SetNull(1));
+            ret->val = boost::none;
+            break;
+          case 1:
+            CHECK_OK(row->SetInt32(1, val));
+            ret->val = val;
+            break;
+        }
+
+        if ((type != RowOperationsPB::UPDATE) && (val % 3 == 1)) {
+          // Don't set the value. In the case of an INSERT or an UPSERT with 
no pre-existing
+          // row, this should default to NULL. Otherwise it should remain set 
to whatever it
+          // was previously set to.
+          CHECK_OK(row->Unset(1));
+
+          if (type == RowOperationsPB::INSERT || old_row == boost::none) {
+            ret->val = boost::none;
+          } else {
+            ret->val = old_row->val;
+          }
+        }
+        break;
+      case RowOperationsPB::DELETE:
+        ret = boost::none;
+        break;
+      default:
+        LOG(FATAL) << "Unknown type: " << type;
     }
-    string ret = row->ToString();
-    ops->push_back(LocalTabletWriter::Op(RowOperationsPB::UPDATE, 
row.release()));
+    ops->push_back(LocalTabletWriter::Op(type, row.release()));
     return ret;
   }
 
+
   // Adds a delete of the given row to 'ops', returning an empty string 
(indicating that
   // the row no longer exists).
-  string DeleteRow(int key, vector<LocalTabletWriter::Op>* ops) {
+  optional<ExpectedKeyValueRow> DeleteRow(int key, 
vector<LocalTabletWriter::Op>* ops) {
     gscoped_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
     CHECK_OK(row->SetInt32(0, key));
     ops->push_back(LocalTabletWriter::Op(RowOperationsPB::DELETE, 
row.release()));
-    return "";
+    return boost::none;
   }
 
   // Random-read the given row, returning its current value.
-  // If the row doesn't exist, returns "()".
-  string GetRow(int key) {
+  // If the row doesn't exist, returns boost::none.
+  optional<ExpectedKeyValueRow> GetRow(int key) {
     ScanSpec spec;
     const Schema& schema = this->client_schema_;
     gscoped_ptr<RowwiseIterator> iter;
@@ -210,7 +253,7 @@ class TestRandomAccess : public KuduTabletTest {
     spec.AddPredicate(pred_one);
     CHECK_OK(iter->Init(&spec));
 
-    string ret = "()";
+    optional<ExpectedKeyValueRow> ret;
     int n_results = 0;
 
     Arena arena(1024, 4*1024*1024);
@@ -229,7 +272,11 @@ class TestRandomAccess : public KuduTabletTest {
           << " and now have new matching row: "
           << schema.DebugRow(block.row(i))
           << "  iterator: " << iter->ToString();
-        ret = schema.DebugRow(block.row(i));
+        ret = ExpectedKeyValueRow();
+        ret->key = *schema.ExtractColumnFromRow<INT32>(block.row(i), 0);
+        if (!block.row(i).is_null(1)) {
+          ret->val = *schema.ExtractColumnFromRow<INT32>(block.row(i), 1);
+        }
         n_results++;
       }
     }
@@ -238,7 +285,7 @@ class TestRandomAccess : public KuduTabletTest {
 
  protected:
   // The current expected state of the tablet.
-  vector<string> expected_tablet_state_;
+  vector<optional<ExpectedKeyValueRow>> expected_tablet_state_;
 
   // Latch triggered when the main thread is finished performing
   // operations. This stops the compact/flush thread.

Reply via email to