This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fe65185d9bc23489dc693d5214fb0d2172325737 Author: Grant Henke <[email protected]> AuthorDate: Mon Oct 26 22:28:16 2020 -0500 KUDU-1563. Add UPDATE_IGNORE and DELETE_IGNORE operations This patch adds an UPDATE_IGNORE operation which behaves like a normal UPDATE except a key not found error will not be raised by the primary key having not been previously inserted. This patch also adds a DELETE_IGNORE operation which behaves like a normal DELETE except a key not found error will not be raised by the primary key having not been previously inserted. Follow on patches will add more client support. Note: This patch is a follow on to https://gerrit.cloudera.org/#/c/4491/. Change-Id: I11dfd06e8d4d22cf1097fe1ff01a1b97cafaf899 Reviewed-on: http://gerrit.cloudera.org:8080/16661 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/client/client-test.cc | 98 +++++++++++++++++++--- src/kudu/client/client.cc | 8 ++ src/kudu/client/client.h | 12 +++ src/kudu/client/write_op.cc | 18 ++++ src/kudu/client/write_op.h | 55 +++++++++++- src/kudu/common/row_operations-test.cc | 22 ++++- src/kudu/common/row_operations.cc | 8 +- src/kudu/common/row_operations.h | 2 +- src/kudu/common/wire_protocol.proto | 2 + src/kudu/integration-tests/fuzz-itest.cc | 96 ++++++++++++++++++--- src/kudu/tablet/local_tablet_writer.h | 8 ++ src/kudu/tablet/ops/op.cc | 4 + src/kudu/tablet/ops/op.h | 3 +- src/kudu/tablet/ops/write_op.cc | 23 +++++ src/kudu/tablet/tablet-test.cc | 64 ++++++++++++++ src/kudu/tablet/tablet.cc | 41 ++++++--- src/kudu/tablet/tablet.h | 4 +- src/kudu/tablet/tablet_bootstrap.cc | 9 +- src/kudu/tablet/tablet_metrics.cc | 12 +++ src/kudu/tablet/tablet_metrics.h | 2 + src/kudu/tablet/tablet_random_access-test.cc | 55 +++++++++--- .../tserver/tablet_server_authorization-test.cc | 21 +++++ 22 files changed, 507 insertions(+), 60 deletions(-) diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 8f95acc..4445a49 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -413,10 +413,24 @@ class ClientTest : public KuduTest { } static unique_ptr<KuduInsertIgnore> BuildTestInsertIgnore(KuduTable* table, int index) { - unique_ptr<KuduInsertIgnore> insert(table->NewInsertIgnore()); - KuduPartialRow* row = insert->mutable_row(); + unique_ptr<KuduInsertIgnore> insert_ignore(table->NewInsertIgnore()); + KuduPartialRow* row = insert_ignore->mutable_row(); PopulateDefaultRow(row, index); - return insert; + return insert_ignore; + } + + static unique_ptr<KuduUpdateIgnore> BuildTestUpdateIgnore(KuduTable* table, int index) { + unique_ptr<KuduUpdateIgnore> update_ignore(table->NewUpdateIgnore()); + KuduPartialRow* row = update_ignore->mutable_row(); + PopulateDefaultRow(row, index); + return update_ignore; + } + + static unique_ptr<KuduDeleteIgnore> BuildTestDeleteIgnore(KuduTable* table, int index) { + unique_ptr<KuduDeleteIgnore> delete_ignore(table->NewDeleteIgnore()); + KuduPartialRow* row = delete_ignore->mutable_row(); + CHECK_OK(row->SetInt32(0, index)); + return delete_ignore; } static void PopulateDefaultRow(KuduPartialRow* row, int index) { @@ -2509,7 +2523,7 @@ TEST_F(ClientTest, TestInsertSingleRowManualBatch) { FlushSessionOrDie(session); } -static void DoTestInsertIgnoreVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) { +static void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) { vector<string> rows; KuduScanner scanner(tbl.get()); ASSERT_OK(ScanToStrings(&scanner, &rows)); @@ -2529,32 +2543,94 @@ TEST_F(ClientTest, TestInsertIgnore) { { unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1)); ASSERT_OK(session->Apply(insert.release())); - DoTestInsertIgnoreVerifyRows(client_table_, 1); + DoTestVerifyRows(client_table_, 1); } { - // INSERT IGNORE results in no error on duplicate primary key + // INSERT IGNORE results in no error on duplicate primary key. unique_ptr<KuduInsertIgnore> insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1)); ASSERT_OK(session->Apply(insert_ignore.release())); - DoTestInsertIgnoreVerifyRows(client_table_, 1); + DoTestVerifyRows(client_table_, 1); } { - // INSERT IGNORE cannot update row + // INSERT IGNORE cannot update row. unique_ptr<KuduInsertIgnore> insert_ignore(client_table_->NewInsertIgnore()); ASSERT_OK(insert_ignore->mutable_row()->SetInt32("key", 1)); ASSERT_OK(insert_ignore->mutable_row()->SetInt32("int_val", 999)); ASSERT_OK(insert_ignore->mutable_row()->SetStringCopy("string_val", "hello world")); ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default", 999)); ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but results in no change - DoTestInsertIgnoreVerifyRows(client_table_, 1); + DoTestVerifyRows(client_table_, 1); } { - // INSERT IGNORE can insert new row + // INSERT IGNORE can insert new row. unique_ptr<KuduInsertIgnore> insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2)); ASSERT_OK(session->Apply(insert_ignore.release())); - DoTestInsertIgnoreVerifyRows(client_table_, 2); + DoTestVerifyRows(client_table_, 2); + } +} + +TEST_F(ClientTest, TestUpdateIgnore) { + shared_ptr<KuduSession> session = client_->NewSession(); + session->SetTimeoutMillis(10000); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); + + { + // UPDATE IGNORE results in no error on missing primary key. + unique_ptr<KuduUpdateIgnore> update_ignore(BuildTestUpdateIgnore(client_table_.get(), 1)); + ASSERT_OK(session->Apply(update_ignore.release())); + DoTestVerifyRows(client_table_, 0); + } + + { + unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1)); + ASSERT_OK(session->Apply(insert.release())); + DoTestVerifyRows(client_table_, 1); + } + + { + // UPDATE IGNORE can update row. + unique_ptr<KuduUpdateIgnore> update_ignore(client_table_->NewUpdateIgnore()); + ASSERT_OK(update_ignore->mutable_row()->SetInt32("key", 1)); + ASSERT_OK(update_ignore->mutable_row()->SetInt32("int_val", 999)); + ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "hello world")); + ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default", 999)); + ASSERT_OK(session->Apply(update_ignore.release())); + + vector<string> rows; + KuduScanner scanner(client_table_.get()); + ASSERT_OK(ScanToStrings(&scanner, &rows)); + ASSERT_EQ(1, rows.size()); + ASSERT_EQ("(int32 key=1, int32 int_val=999, string string_val=\"hello world\", " + "int32 non_null_with_default=999)", rows[0]); + } +} + +TEST_F(ClientTest, TestDeleteIgnore) { + shared_ptr<KuduSession> session = client_->NewSession(); + session->SetTimeoutMillis(10000); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); + + { + unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1)); + ASSERT_OK(session->Apply(insert.release())); + DoTestVerifyRows(client_table_, 1); + } + + { + // DELETE IGNORE can delete row. + unique_ptr<KuduDeleteIgnore> delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1)); + ASSERT_OK(session->Apply(delete_ignore.release())); + DoTestVerifyRows(client_table_, 0); + } + + { + // DELETE IGNORE results in no error on missing primary key. + unique_ptr<KuduDeleteIgnore> delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1)); + ASSERT_OK(session->Apply(delete_ignore.release())); + DoTestVerifyRows(client_table_, 0); } } diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 6c6bce4..181db49 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -967,10 +967,18 @@ KuduUpdate* KuduTable::NewUpdate() { return new KuduUpdate(shared_from_this()); } +KuduUpdateIgnore* KuduTable::NewUpdateIgnore() { + return new KuduUpdateIgnore(shared_from_this()); +} + KuduDelete* KuduTable::NewDelete() { return new KuduDelete(shared_from_this()); } +KuduDeleteIgnore* KuduTable::NewDeleteIgnore() { + return new KuduDeleteIgnore(shared_from_this()); +} + KuduClient* KuduTable::client() const { return data_->client_.get(); } diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index aa2a207..16193d3 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -81,6 +81,7 @@ namespace client { class KuduColumnarScanBatch; class KuduDelete; +class KuduDeleteIgnore; class KuduInsert; class KuduInsertIgnore; class KuduLoggingCallback; @@ -94,6 +95,7 @@ class KuduTableStatistics; class KuduTablet; class KuduTabletServer; class KuduUpdate; +class KuduUpdateIgnore; class KuduUpsert; class KuduValue; class KuduWriteOperation; @@ -1085,11 +1087,21 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> { /// KuduSession::Apply(). KuduUpdate* NewUpdate(); + /// @return New @c UPDATE_IGNORE operation for this table. It is the + /// caller's responsibility to free the result, unless it is passed to + /// KuduSession::Apply(). + KuduUpdateIgnore* NewUpdateIgnore(); + /// @return New @c DELETE operation for this table. It is the caller's /// responsibility to free the result, unless it is passed to /// KuduSession::Apply(). KuduDelete* NewDelete(); + /// @return New @c DELETE_IGNORE operation for this table. It is the + /// caller's responsibility to free the result, unless it is passed to + /// KuduSession::Apply(). + KuduDeleteIgnore* NewDeleteIgnore(); + /// Create a new comparison predicate. /// /// This method creates new instance of a comparison predicate which diff --git a/src/kudu/client/write_op.cc b/src/kudu/client/write_op.cc index ad32a93..ca47205 100644 --- a/src/kudu/client/write_op.cc +++ b/src/kudu/client/write_op.cc @@ -44,6 +44,8 @@ RowOperationsPB_Type ToInternalWriteType(KuduWriteOperation::Type type) { case KuduWriteOperation::DELETE: return RowOperationsPB_Type_DELETE; case KuduWriteOperation::UPSERT: return RowOperationsPB_Type_UPSERT; case KuduWriteOperation::INSERT_IGNORE: return RowOperationsPB_Type_INSERT_IGNORE; + case KuduWriteOperation::UPDATE_IGNORE: return RowOperationsPB_Type_UPDATE_IGNORE; + case KuduWriteOperation::DELETE_IGNORE: return RowOperationsPB_Type_DELETE_IGNORE; default: LOG(FATAL) << "Unexpected write operation type: " << type; } } @@ -110,6 +112,14 @@ KuduUpdate::KuduUpdate(const shared_ptr<KuduTable>& table) KuduUpdate::~KuduUpdate() {} +// UpdateIgnore ----------------------------------------------------------------- + +KuduUpdateIgnore::KuduUpdateIgnore(const shared_ptr<KuduTable>& table) + : KuduWriteOperation(table) { +} + +KuduUpdateIgnore::~KuduUpdateIgnore() {} + // Delete ----------------------------------------------------------------------- KuduDelete::KuduDelete(const shared_ptr<KuduTable>& table) @@ -118,6 +128,14 @@ KuduDelete::KuduDelete(const shared_ptr<KuduTable>& table) KuduDelete::~KuduDelete() {} +// DeleteIgnore ----------------------------------------------------------------- + +KuduDeleteIgnore::KuduDeleteIgnore(const shared_ptr<KuduTable>& table) + : KuduWriteOperation(table) { +} + +KuduDeleteIgnore::~KuduDeleteIgnore() {} + // Upsert ----------------------------------------------------------------------- KuduUpsert::KuduUpsert(const shared_ptr<KuduTable>& table) diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h index 0c0cb16..d639188 100644 --- a/src/kudu/client/write_op.h +++ b/src/kudu/client/write_op.h @@ -69,7 +69,9 @@ class KUDU_EXPORT KuduWriteOperation { UPDATE = 2, DELETE = 3, UPSERT = 4, - INSERT_IGNORE = 5 + INSERT_IGNORE = 5, + UPDATE_IGNORE = 6, + DELETE_IGNORE = 7 }; virtual ~KuduWriteOperation(); @@ -236,6 +238,31 @@ class KUDU_EXPORT KuduUpdate : public KuduWriteOperation { explicit KuduUpdate(const sp::shared_ptr<KuduTable>& table); }; +/// @brief A single row update ignore to be sent to the cluster, missing row errors are ignored. +/// +/// @pre An update ignore requires the key columns and at least one other column +/// in the schema to be set in the embedded KuduPartialRow object. +class KUDU_EXPORT KuduUpdateIgnore : public KuduWriteOperation { +public: + virtual ~KuduUpdateIgnore(); + + /// @copydoc KuduWriteOperation::ToString() + virtual std::string ToString() const OVERRIDE { return "UPDATE IGNORE " + row_.ToString(); } + +protected: + /// @cond PROTECTED_MEMBERS_DOCUMENTED + + /// @copydoc KuduWriteOperation::type() + virtual Type type() const OVERRIDE { + return UPDATE_IGNORE; + } + + /// @endcond + +private: + friend class KuduTable; + explicit KuduUpdateIgnore(const sp::shared_ptr<KuduTable>& table); +}; /// @brief A single row delete to be sent to the cluster. /// @@ -263,6 +290,32 @@ class KUDU_EXPORT KuduDelete : public KuduWriteOperation { explicit KuduDelete(const sp::shared_ptr<KuduTable>& table); }; +/// @brief A single row delete ignore to be sent to the cluster. +/// +/// @pre A delete ignore requires the key columns to be set in the embedded +/// KuduPartialRow object. +class KUDU_EXPORT KuduDeleteIgnore : public KuduWriteOperation { +public: + virtual ~KuduDeleteIgnore(); + + /// @copydoc KuduWriteOperation::ToString() + virtual std::string ToString() const OVERRIDE { return "DELETE IGNORE " + row_.ToString(); } + +protected: + /// @cond PROTECTED_MEMBERS_DOCUMENTED + + /// @copydoc KuduWriteOperation::type() + virtual Type type() const OVERRIDE { + return DELETE_IGNORE; + } + + /// @endcond + +private: + friend class KuduTable; + explicit KuduDeleteIgnore(const sp::shared_ptr<KuduTable>& table); +}; + } // namespace client } // namespace kudu diff --git a/src/kudu/common/row_operations-test.cc b/src/kudu/common/row_operations-test.cc index fec5efd..34dccb7 100644 --- a/src/kudu/common/row_operations-test.cc +++ b/src/kudu/common/row_operations-test.cc @@ -125,7 +125,7 @@ void RowOperationsTest::CheckDecodeDoesntCrash(const Schema& client_schema, void RowOperationsTest::DoFuzzTest(const Schema& server_schema, const KuduPartialRow& row, int n_random_changes) { - for (int operation = 0; operation <= 9; operation++) { + for (int operation = 0; operation <= 11; operation++) { RowOperationsPB pb; RowOperationsPBEncoder enc(&pb); @@ -160,6 +160,12 @@ void RowOperationsTest::DoFuzzTest(const Schema& server_schema, case 9: enc.Add(RowOperationsPB::INSERT_IGNORE, row); break; + case 10: + enc.Add(RowOperationsPB::UPDATE_IGNORE, row); + break; + case 11: + enc.Add(RowOperationsPB::DELETE_IGNORE, row); + break; } const Schema* client_schema = row.schema(); @@ -826,7 +832,8 @@ void CheckExceedCellLimit(const Schema& client_schema, // Fill the row. KuduPartialRow row(&client_schema); for (size_t i = 0; i < client_schema.num_columns(); ++i) { - if (op_type == RowOperationsPB::DELETE && i >= client_schema.num_key_columns()) { + if ((op_type == RowOperationsPB::DELETE || op_type == RowOperationsPB::DELETE_IGNORE) && + i >= client_schema.num_key_columns()) { // DELETE should not have a value for non-key column. break; } @@ -856,6 +863,9 @@ void CheckExceedCellLimit(const Schema& client_schema, case RowOperationsPB::UPDATE: case RowOperationsPB::DELETE: case RowOperationsPB::UPSERT: + case RowOperationsPB::INSERT_IGNORE: + case RowOperationsPB::UPDATE_IGNORE: + case RowOperationsPB::DELETE_IGNORE: s = decoder.DecodeOperations<WRITE_OPS>(&ops); break; case RowOperationsPB::SPLIT_ROW: @@ -870,7 +880,8 @@ void CheckExceedCellLimit(const Schema& client_schema, } ASSERT_OK(s); for (const auto& op : ops) { - ASSERT_EQ(op.result.CodeAsString(), expect_status.CodeAsString()); + ASSERT_EQ(op.result.CodeAsString(), + expect_status.CodeAsString()) << op.result.message().ToString(); ASSERT_STR_CONTAINS(op.result.ToString(), expect_msg); } } @@ -880,6 +891,7 @@ void CheckInsertUpsertExceedCellLimit(const Schema& client_schema, const Status& expect_status, const string& expect_msg) { for (auto op_type : { RowOperationsPB::INSERT, + RowOperationsPB::INSERT_IGNORE, RowOperationsPB::UPSERT }) { NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type, expect_status, expect_msg)); } @@ -890,7 +902,9 @@ void CheckUpdateDeleteExceedCellLimit(const Schema& client_schema, const Status& expect_status, const string& expect_msg) { for (auto op_type : { RowOperationsPB::UPDATE, - RowOperationsPB::DELETE }) { + RowOperationsPB::UPDATE_IGNORE, + RowOperationsPB::DELETE, + RowOperationsPB::DELETE_IGNORE }) { NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type, expect_status, expect_msg)); } } diff --git a/src/kudu/common/row_operations.cc b/src/kudu/common/row_operations.cc index b82ace6..363a424 100644 --- a/src/kudu/common/row_operations.cc +++ b/src/kudu/common/row_operations.cc @@ -70,7 +70,9 @@ string DecodedRowOperation::ToString(const Schema& schema) const { case RowOperationsPB::UPSERT: return "UPSERT " + schema.DebugRow(ConstContiguousRow(&schema, row_data)); case RowOperationsPB::UPDATE: + case RowOperationsPB::UPDATE_IGNORE: case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: return Substitute("MUTATE $0 $1", schema.DebugRowKey(ConstContiguousRow(&schema, row_data)), changelist.ToString(schema)); @@ -542,7 +544,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m // update to perform. // For DELETE, we expect no other columns to be set (and we verify that). Status row_status; - if (op->type == RowOperationsPB::UPDATE) { + if (op->type == RowOperationsPB::UPDATE || op->type == RowOperationsPB::UPDATE_IGNORE) { faststring buf; RowChangeListEncoder rcl_encoder(&buf); @@ -586,7 +588,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m memcpy(rcl_in_arena, buf.data(), buf.size()); op->changelist = RowChangeList(Slice(rcl_in_arena, buf.size())); } - } else if (op->type == RowOperationsPB::DELETE) { + } else if (op->type == RowOperationsPB::DELETE || op->type == RowOperationsPB::DELETE_IGNORE) { // Ensure that no other columns are set. for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) { if (PREDICT_FALSE(BitmapTest(client_isset_map, client_col_idx))) { @@ -700,7 +702,9 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>( RETURN_NOT_OK(DecodeInsertOrUpsert(prototype_row_storage, mapping, op)); break; case RowOperationsPB::UPDATE: + case RowOperationsPB::UPDATE_IGNORE: case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: RETURN_NOT_OK(DecodeUpdateOrDelete(mapping, op)); break; default: diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h index af3a7fd..16b7ba4 100644 --- a/src/kudu/common/row_operations.h +++ b/src/kudu/common/row_operations.h @@ -76,7 +76,7 @@ struct DecodedRowOperation { RowOperationsPB::Type type; // For INSERT, INSERT_IGNORE, or UPSERT, the whole projected row. - // For UPDATE or DELETE, the row key. + // For UPDATE, UPDATE_IGNORE, DELETE, or DELETE_IGNORE, the row key. const uint8_t* row_data; // For INSERT or UPDATE, a bitmap indicating which of the cells were diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto index 837acc0..f65ddea 100644 --- a/src/kudu/common/wire_protocol.proto +++ b/src/kudu/common/wire_protocol.proto @@ -195,6 +195,8 @@ message RowOperationsPB { DELETE = 3; UPSERT = 5; INSERT_IGNORE = 10; + UPDATE_IGNORE = 11; + DELETE_IGNORE = 12; // Used when specifying split rows on table creation. SPLIT_ROW = 4; diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc index 1ed8252..36e3825 100644 --- a/src/kudu/integration-tests/fuzz-itest.cc +++ b/src/kudu/integration-tests/fuzz-itest.cc @@ -108,7 +108,9 @@ enum TestOpType { TEST_UPSERT, TEST_UPSERT_PK_ONLY, TEST_UPDATE, + TEST_UPDATE_IGNORE, TEST_DELETE, + TEST_DELETE_IGNORE, TEST_FLUSH_OPS, TEST_FLUSH_TABLET, TEST_FLUSH_DELTAS, @@ -130,7 +132,9 @@ const char* TestOpType_names[] = { "TEST_UPSERT", "TEST_UPSERT_PK_ONLY", "TEST_UPDATE", + "TEST_UPDATE_IGNORE", "TEST_DELETE", + "TEST_DELETE_IGNORE", "TEST_FLUSH_OPS", "TEST_FLUSH_TABLET", "TEST_FLUSH_DELTAS", @@ -180,7 +184,9 @@ struct TestOp { case TEST_UPSERT: case TEST_UPSERT_PK_ONLY: case TEST_UPDATE: + case TEST_UPDATE_IGNORE: case TEST_DELETE: + case TEST_DELETE_IGNORE: case TEST_SCAN_AT_TIMESTAMP: return strings::Substitute("{$0, $1}", TestOpType_names[type], val); case TEST_DIFF_SCAN: @@ -224,7 +230,9 @@ const vector<TestOpType> kAllOps {TEST_INSERT, TEST_UPSERT, TEST_UPSERT_PK_ONLY, TEST_UPDATE, + TEST_UPDATE_IGNORE, TEST_DELETE, + TEST_DELETE_IGNORE, TEST_FLUSH_OPS, TEST_FLUSH_TABLET, TEST_FLUSH_DELTAS, @@ -239,6 +247,7 @@ const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY, TEST_INSERT_IGNORE_PK_ONLY, TEST_UPSERT_PK_ONLY, TEST_DELETE, + TEST_DELETE_IGNORE, TEST_FLUSH_OPS, TEST_FLUSH_TABLET, TEST_FLUSH_DELTAS, @@ -372,10 +381,15 @@ class FuzzTest : public KuduTest { // Adds an update of the given key/value pair to 'ops', returning the new contents // of the row. - ExpectedKeyValueRow MutateRow(int key, uint32_t new_val) { + ExpectedKeyValueRow MutateRow(int key, uint32_t new_val, TestOpType type) { ExpectedKeyValueRow ret; - unique_ptr<KuduUpdate> update(table_->NewUpdate()); - KuduPartialRow* row = update->mutable_row(); + unique_ptr<KuduWriteOperation> op; + if (type == TEST_UPDATE_IGNORE) { + op.reset(table_->NewUpdateIgnore()); + } else { + op.reset(table_->NewUpdate()); + } + KuduPartialRow* row = op->mutable_row(); CHECK_OK(row->SetInt32(0, key)); ret.key = key; if (new_val & 1) { @@ -384,17 +398,22 @@ class FuzzTest : public KuduTest { CHECK_OK(row->SetInt32(1, new_val)); ret.val = new_val; } - CHECK_OK(session_->Apply(update.release())); + CHECK_OK(session_->Apply(op.release())); return ret; } // Adds a delete of the given row to 'ops', returning boost::none (indicating that // the row no longer exists). - optional<ExpectedKeyValueRow> DeleteRow(int key) { - unique_ptr<KuduDelete> del(table_->NewDelete()); - KuduPartialRow* row = del->mutable_row(); + optional<ExpectedKeyValueRow> DeleteRow(int key, TestOpType type) { + unique_ptr<KuduWriteOperation> op; + if (type == TEST_DELETE_IGNORE) { + op.reset(table_->NewDeleteIgnore()); + } else { + op.reset(table_->NewDelete()); + } + KuduPartialRow* row = op->mutable_row(); CHECK_OK(row->SetInt32(0, key)); - CHECK_OK(session_->Apply(del.release())); + CHECK_OK(session_->Apply(op.release())); return boost::none; } @@ -708,7 +727,9 @@ bool IsMutation(const TestOpType& op) { case TEST_UPSERT: case TEST_UPSERT_PK_ONLY: case TEST_UPDATE: + case TEST_UPDATE_IGNORE: case TEST_DELETE: + case TEST_DELETE_IGNORE: return true; default: return false; @@ -772,20 +793,39 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) { break; case TEST_UPDATE: if (!exists[row_key]) continue; - ops->emplace_back(TEST_UPDATE, row_key); + ops->emplace_back(r, row_key); ops_pending = true; if (!data_in_mrs) { data_in_dms = true; } break; + case TEST_UPDATE_IGNORE: + ops->emplace_back(r, row_key); + ops_pending = true; + // If it does exist, this will act like an update and put it into + // a DMS. + if (exists[row_key] && !data_in_mrs) { + data_in_dms = true; + } + break; case TEST_DELETE: if (!exists[row_key]) continue; - ops->emplace_back(TEST_DELETE, row_key); + ops->emplace_back(r, row_key); ops_pending = true; - exists[row_key] = false; if (!data_in_mrs) { data_in_dms = true; } + exists[row_key] = false; + break; + case TEST_DELETE_IGNORE: + ops->emplace_back(r, row_key); + ops_pending = true; + // If it does exist, this will act like a delete and put it into + // a DMS. + if (exists[row_key] && !data_in_mrs) { + data_in_dms = true; + } + exists[row_key] = false; break; case TEST_FLUSH_OPS: if (ops_pending) { @@ -887,10 +927,16 @@ void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) { case TEST_UPDATE: CHECK(exists[test_op.val]) << "invalid case: updating non-existing row"; break; + case TEST_UPDATE_IGNORE: + // No change to `exists[test_op.val]`. + break; case TEST_DELETE: CHECK(exists[test_op.val]) << "invalid case: deleting non-existing row"; exists[test_op.val] = false; break; + case TEST_DELETE_IGNORE: + exists[test_op.val] = false; + break; default: break; } @@ -944,15 +990,39 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops, break; } case TEST_UPDATE: + case TEST_UPDATE_IGNORE: { + // An update ignore on a row that doesn't exist will be dropped server-side. + // We must do the same. + if (test_op.type == TEST_UPDATE_IGNORE && !pending_val[test_op.val]) { + // Still call MutateRow to apply the UPDATE_IGNORE operations to the session. + // However don't adjust the pending values given the operation will be ignored. + for (int j = 0; j < update_multiplier; j++) { + MutateRow(test_op.val, i++, test_op.type); + } + break; + } + for (int j = 0; j < update_multiplier; j++) { - pending_val[test_op.val] = MutateRow(test_op.val, i++); + pending_val[test_op.val] = MutateRow(test_op.val, i++, test_op.type); pending_redos.emplace_back(UPDATE, test_op.val, pending_val[test_op.val]->val); } break; + } case TEST_DELETE: - pending_val[test_op.val] = DeleteRow(test_op.val); + case TEST_DELETE_IGNORE: { + // A delete ignore on a row that doesn't exist will be dropped server-side. + // We must do the same. + if (test_op.type == TEST_DELETE_IGNORE && !pending_val[test_op.val]) { + // Still call DeleteRow to apply the DELETE_IGNORE operation to the session. + // However don't adjust the pending values given the operation will be ignored. + DeleteRow(test_op.val, test_op.type); + break; + } + + pending_val[test_op.val] = DeleteRow(test_op.val, test_op.type); pending_redos.emplace_back(DELETE, test_op.val, boost::none); break; + } case TEST_FLUSH_OPS: { FlushSessionOrDie(session_); cur_val = pending_val; diff --git a/src/kudu/tablet/local_tablet_writer.h b/src/kudu/tablet/local_tablet_writer.h index ad6377a..f7dfcfe 100644 --- a/src/kudu/tablet/local_tablet_writer.h +++ b/src/kudu/tablet/local_tablet_writer.h @@ -79,10 +79,18 @@ class LocalTabletWriter { return Write(RowOperationsPB::DELETE, row); } + Status DeleteIgnore(const KuduPartialRow& row) { + return Write(RowOperationsPB::DELETE_IGNORE, row); + } + Status Update(const KuduPartialRow& row) { return Write(RowOperationsPB::UPDATE, row); } + Status UpdateIgnore(const KuduPartialRow& row) { + return Write(RowOperationsPB::UPDATE_IGNORE, row); + } + // Perform a write against the local tablet. // Returns a bad Status if the applied operation had a per-row error. Status Write(RowOperationsPB::Type type, diff --git a/src/kudu/tablet/ops/op.cc b/src/kudu/tablet/ops/op.cc index 78be15d..2f247c9 100644 --- a/src/kudu/tablet/ops/op.cc +++ b/src/kudu/tablet/ops/op.cc @@ -76,7 +76,9 @@ OpMetrics::OpMetrics() insert_ignore_errors(0), successful_upserts(0), successful_updates(0), + update_ignore_errors(0), successful_deletes(0), + delete_ignore_errors(0), commit_wait_duration_usec(0) { } @@ -85,7 +87,9 @@ void OpMetrics::Reset() { insert_ignore_errors = 0; successful_upserts = 0; successful_updates = 0; + update_ignore_errors = 0; successful_deletes = 0; + delete_ignore_errors = 0; commit_wait_duration_usec = 0; } diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h index c9a6237..5463df6 100644 --- a/src/kudu/tablet/ops/op.h +++ b/src/kudu/tablet/ops/op.h @@ -39,7 +39,6 @@ #include "kudu/tserver/tserver.pb.h" #include "kudu/util/auto_release_pool.h" #include "kudu/util/countdown_latch.h" -#include "kudu/util/locks.h" #include "kudu/util/memory/arena.h" #include "kudu/util/status.h" @@ -63,7 +62,9 @@ struct OpMetrics { int insert_ignore_errors; int successful_upserts; int successful_updates; + int update_ignore_errors; int successful_deletes; + int delete_ignore_errors; uint64_t commit_wait_duration_usec; }; diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc index ec71baf..4cc7fdf 100644 --- a/src/kudu/tablet/ops/write_op.cc +++ b/src/kudu/tablet/ops/write_op.cc @@ -103,6 +103,7 @@ void AddWritePrivilegesForRowOperations(const RowOperationsPB::Type& op_type, WritePrivileges* privileges) { switch (op_type) { case RowOperationsPB::INSERT: + case RowOperationsPB::INSERT_IGNORE: InsertIfNotPresent(privileges, WritePrivilegeType::INSERT); break; case RowOperationsPB::UPSERT: @@ -110,9 +111,11 @@ void AddWritePrivilegesForRowOperations(const RowOperationsPB::Type& op_type, InsertIfNotPresent(privileges, WritePrivilegeType::UPDATE); break; case RowOperationsPB::UPDATE: + case RowOperationsPB::UPDATE_IGNORE: InsertIfNotPresent(privileges, WritePrivilegeType::UPDATE); break; case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: InsertIfNotPresent(privileges, WritePrivilegeType::DELETE); break; default: @@ -271,7 +274,9 @@ void WriteOp::Finish(OpResult result) { metrics->insert_ignore_errors->IncrementBy(state_->metrics().insert_ignore_errors); metrics->rows_upserted->IncrementBy(state_->metrics().successful_upserts); metrics->rows_updated->IncrementBy(state_->metrics().successful_updates); + metrics->update_ignore_errors->IncrementBy(state_->metrics().update_ignore_errors); metrics->rows_deleted->IncrementBy(state_->metrics().successful_deletes); + metrics->delete_ignore_errors->IncrementBy(state_->metrics().delete_ignore_errors); if (type() == consensus::LEADER) { if (state()->external_consistency_mode() == COMMIT_WAIT) { @@ -420,6 +425,7 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) { } switch (op.decoded_op.type) { case RowOperationsPB::INSERT: + DCHECK(!op.error_ignored); op_metrics_.successful_inserts++; break; case RowOperationsPB::INSERT_IGNORE: @@ -430,14 +436,31 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) { } break; case RowOperationsPB::UPSERT: + DCHECK(!op.error_ignored); op_metrics_.successful_upserts++; break; case RowOperationsPB::UPDATE: + DCHECK(!op.error_ignored); op_metrics_.successful_updates++; break; + case RowOperationsPB::UPDATE_IGNORE: + if (op.error_ignored) { + op_metrics_.update_ignore_errors++; + } else { + op_metrics_.successful_updates++; + } + break; case RowOperationsPB::DELETE: + DCHECK(!op.error_ignored); op_metrics_.successful_deletes++; break; + case RowOperationsPB::DELETE_IGNORE: + if (op.error_ignored) { + op_metrics_.delete_ignore_errors++; + } else { + op_metrics_.successful_deletes++; + } + break; case RowOperationsPB::UNKNOWN: case RowOperationsPB::SPLIT_ROW: case RowOperationsPB::RANGE_LOWER_BOUND: diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc index f0e9c92..891e379 100644 --- a/src/kudu/tablet/tablet-test.cc +++ b/src/kudu/tablet/tablet-test.cc @@ -800,6 +800,70 @@ TYPED_TEST(TestTablet, TestInsertIgnore) { EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1011, false) }, rows); } +TYPED_TEST(TestTablet, TestUpdateIgnore) { + LocalTabletWriter writer(this->tablet().get(), &this->client_schema_); + KuduPartialRow row(&this->client_schema_); + vector<string> rows; + + // update ignore a missing row, operation should succeed. + this->setup_.BuildRow(&row, 0, 1000); + vector<LocalTabletWriter::RowOp> ops; + ops.emplace_back(LocalTabletWriter::RowOp(RowOperationsPB::UPDATE_IGNORE, &row)); + ASSERT_OK(writer.WriteBatch(ops)); + ASSERT_OK(this->IterateToStringList(&rows)); + ASSERT_EQ(0, rows.size()); + + // insert a row that can be updated. + ops.clear(); + this->setup_.BuildRow(&row, 0, 1000); + ops.emplace_back(LocalTabletWriter::RowOp(RowOperationsPB::INSERT, &row)); + ASSERT_OK(writer.WriteBatch(ops)); + ASSERT_OK(this->IterateToStringList(&rows)); + ASSERT_EQ(1, rows.size()); + EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1000, false) }, rows); + + // update ignore an existing row, implements normal update. + ops.clear(); + this->setup_.BuildRow(&row, 0, 1011); + ops.emplace_back(LocalTabletWriter::RowOp(RowOperationsPB::UPDATE_IGNORE, &row)); + ASSERT_OK(writer.WriteBatch(ops)); + ASSERT_OK(this->IterateToStringList(&rows)); + ASSERT_EQ(1, rows.size()); + EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1011, false) }, rows); +} + +TYPED_TEST(TestTablet, TestDeleteIgnore) { + LocalTabletWriter writer(this->tablet().get(), &this->client_schema_); + KuduPartialRow row(&this->client_schema_); + vector<string> rows; + + // delete ignore a missing row, operation should succeed. + this->setup_.BuildRowKey(&row, 0); + vector<LocalTabletWriter::RowOp> ops; + ops.emplace_back(LocalTabletWriter::RowOp(RowOperationsPB::DELETE_IGNORE, &row)); + ASSERT_OK(writer.WriteBatch(ops)); + ASSERT_OK(this->IterateToStringList(&rows)); + ASSERT_EQ(0, rows.size()); + + // insert a row that can be deleted. + ops.clear(); + this->setup_.BuildRow(&row, 0, 1000); + ops.emplace_back(LocalTabletWriter::RowOp(RowOperationsPB::INSERT, &row)); + ASSERT_OK(writer.WriteBatch(ops)); + ASSERT_OK(this->IterateToStringList(&rows)); + ASSERT_EQ(1, rows.size()); + EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1000, false) }, rows); + + // delete ignore an existing row, implements normal delete. + ops.clear(); + KuduPartialRow delete_row(&this->client_schema_); + this->setup_.BuildRowKey(&delete_row, 0); + ops.emplace_back(LocalTabletWriter::RowOp(RowOperationsPB::DELETE_IGNORE, &delete_row)); + ASSERT_OK(writer.WriteBatch(ops)); + ASSERT_OK(this->IterateToStringList(&rows)); + ASSERT_EQ(0, rows.size()); +} + TYPED_TEST(TestTablet, TestUpsert) { vector<string> rows; const auto& upserts_as_updates = this->tablet()->metrics()->upserts_as_updates; diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 3198782..badd25b 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -626,7 +626,9 @@ Status Tablet::ValidateOp(const RowOp& op) { return ValidateInsertOrUpsertUnlocked(op); case RowOperationsPB::UPDATE: + case RowOperationsPB::UPDATE_IGNORE: case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: return ValidateMutateUnlocked(op); default: @@ -824,10 +826,10 @@ vector<RowSet*> Tablet::FindRowSetsToCheck(const RowOp* op, Status Tablet::MutateRowUnlocked(const IOContext* io_context, WriteOpState *op_state, - RowOp* mutate, + RowOp* op, ProbeStats* stats) { - DCHECK(mutate->checked_present); - DCHECK(mutate->valid); + DCHECK(op->checked_present); + DCHECK(op->valid); auto* result = google::protobuf::Arena::CreateMessage<OperationResultPB>( op_state->pb_arena()); @@ -836,23 +838,38 @@ Status Tablet::MutateRowUnlocked(const IOContext* io_context, // If we found the row in any existing RowSet, mutate it there. Otherwise // attempt to mutate in the MRS. - RowSet* rs_to_attempt = mutate->present_in_rowset ? - mutate->present_in_rowset : comps->memrowset.get(); + RowSet* rs_to_attempt = op->present_in_rowset ? + op->present_in_rowset : comps->memrowset.get(); Status s = rs_to_attempt->MutateRow(ts, - *mutate->key_probe, - mutate->decoded_op.changelist, + *op->key_probe, + op->decoded_op.changelist, op_state->op_id(), io_context, stats, result); if (PREDICT_TRUE(s.ok())) { - mutate->SetMutateSucceeded(result); + op->SetMutateSucceeded(result); } else { if (s.IsNotFound()) { - // Replace internal error messages with one more suitable for users. - s = Status::NotFound("key not found"); + RowOperationsPB_Type op_type = op->decoded_op.type; + switch (op_type) { + case RowOperationsPB::UPDATE_IGNORE: + case RowOperationsPB::DELETE_IGNORE: + s = Status::OK(); + op->SetErrorIgnored(); + break; + case RowOperationsPB::UPDATE: + case RowOperationsPB::DELETE: + // Replace internal error messages with one more suitable for users. + s = Status::NotFound("key not found"); + op->SetFailed(s); + break; + default: + LOG(FATAL) << "Unknown operation type: " << op_type; + } + } else { + op->SetFailed(s); } - mutate->SetFailed(s); } return s; } @@ -1115,7 +1132,9 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context, return s; case RowOperationsPB::UPDATE: + case RowOperationsPB::UPDATE_IGNORE: case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: s = MutateRowUnlocked(io_context, op_state, row_op, stats); if (s.IsNotFound()) { return Status::OK(); diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 728dcea..24219bd 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -589,10 +589,10 @@ class Tablet { RowOp* op, ProbeStats* stats); - // Same as above, but for UPDATE. + // Same as above, but for UPDATE, UPDATE_IGNORE, DELETE, or DELETE_IGNORE operations. Status MutateRowUnlocked(const fs::IOContext* io_context, WriteOpState *op_state, - RowOp* mutate, + RowOp* op, ProbeStats* stats); // In the case of an UPSERT against a duplicate row, converts the UPSERT diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index a6298cf..22e8fb5 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -445,8 +445,9 @@ class TabletBootstrap { // Number of REPLICATE messages for which a matching COMMIT was found. int ops_committed; - // Number inserts/mutations seen and ignored. Note inserts_ignored does not refer - // to the INSERT_IGNORE operation. It refers to inserts ignored during log replay. + // Number inserts/mutations seen and ignored. Note inserts_ignored and mutations_ignored + // do not refer to the INSERT_IGNORE, UPDATE_IGNORE, and DELETE_IGNORE operations. + // They refer to inserts and mutations ignored during log replay. int inserts_seen, inserts_ignored; int mutations_seen, mutations_ignored; @@ -1642,7 +1643,9 @@ Status TabletBootstrap::ApplyOperations(const IOContext* io_context, break; } case RowOperationsPB::UPDATE: - case RowOperationsPB::DELETE: { + case RowOperationsPB::UPDATE_IGNORE: + case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: { stats_.mutations_seen++; if (op->has_result()) { stats_.mutations_ignored++; diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc index fd98049..b52065f 100644 --- a/src/kudu/tablet/tablet_metrics.cc +++ b/src/kudu/tablet/tablet_metrics.cc @@ -44,10 +44,20 @@ METRIC_DEFINE_counter(tablet, rows_updated, "Rows Updated", kudu::MetricUnit::kRows, "Number of row update operations performed on this tablet since service start", kudu::MetricLevel::kInfo); +METRIC_DEFINE_counter(tablet, update_ignore_errors, "Update Ignore Errors", + kudu::MetricUnit::kRows, + "Number of update ignore operations for this tablet which were " + "ignored due to an error since service start", + kudu::MetricLevel::kDebug); METRIC_DEFINE_counter(tablet, rows_deleted, "Rows Deleted", kudu::MetricUnit::kRows, "Number of row delete operations performed on this tablet since service start", kudu::MetricLevel::kInfo); +METRIC_DEFINE_counter(tablet, delete_ignore_errors, "Delete Ignore Errors", + kudu::MetricUnit::kRows, + "Number of delete ignore operations for this tablet which were " + "ignored due to an error since service start", + kudu::MetricLevel::kDebug); METRIC_DEFINE_counter(tablet, insertions_failed_dup_key, "Duplicate Key Inserts", kudu::MetricUnit::kRows, @@ -364,6 +374,8 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity) MINIT(rows_updated), MINIT(rows_deleted), MINIT(insert_ignore_errors), + MINIT(update_ignore_errors), + MINIT(delete_ignore_errors), MINIT(insertions_failed_dup_key), MINIT(upserts_as_updates), MINIT(scanner_rows_returned), diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h index 9081fdc..6be2e70 100644 --- a/src/kudu/tablet/tablet_metrics.h +++ b/src/kudu/tablet/tablet_metrics.h @@ -49,6 +49,8 @@ struct TabletMetrics { scoped_refptr<Counter> rows_updated; scoped_refptr<Counter> rows_deleted; scoped_refptr<Counter> insert_ignore_errors; + scoped_refptr<Counter> update_ignore_errors; + scoped_refptr<Counter> delete_ignore_errors; scoped_refptr<Counter> insertions_failed_dup_key; scoped_refptr<Counter> upserts_as_updates; diff --git a/src/kudu/tablet/tablet_random_access-test.cc b/src/kudu/tablet/tablet_random_access-test.cc index 34ddd39..9fd0317 100644 --- a/src/kudu/tablet/tablet_random_access-test.cc +++ b/src/kudu/tablet/tablet_random_access-test.cc @@ -117,31 +117,45 @@ class TestRandomAccess : public KuduTabletTest { vector<LocalTabletWriter::RowOp> pending; for (int i = 0; i < 3; i++) { int new_val = rand(); - int r = rand() % 3; if (cur_val == boost::none) { - // If there is no row, then randomly insert or upsert. - switch (r) { + // If there is no row, then randomly insert, insert ignore, + // update ignore, delete ignore, or upsert. + switch (rand() % 5) { case 1: cur_val = InsertRow(key, new_val, &pending); break; case 2: cur_val = InsertIgnoreRow(key, new_val, &pending); break; + case 3: + UpdateIgnoreRow(key, new_val, cur_val, &pending); // won't change current value + break; + case 4: + DeleteIgnoreRow(key, &pending); // won't change current value + break; default: cur_val = UpsertRow(key, new_val, cur_val, &pending); } } else { if (new_val % (FLAGS_update_delete_ratio + 1) == 0) { - cur_val = DeleteRow(key, &pending); + // Randomly choose between delete and delete ignore. + if (rand() % 2 == 0) { + cur_val = DeleteRow(key, &pending); + } else { + cur_val = DeleteIgnoreRow(key, &pending); + } } else { // If row already exists, randomly choose between an update, - // upsert, and insert ignore. - switch (r) { + // update ignore, insert ignore, and upsert. + switch (rand() % 4) { case 1: - cur_val = MutateRow(key, new_val, cur_val, &pending); + cur_val = UpdateRow(key, new_val, cur_val, &pending); break; case 2: - InsertIgnoreRow(key, new_val, &pending); // won't change existing value + cur_val = UpdateIgnoreRow(key, new_val, cur_val, &pending); + break; + case 3: + InsertIgnoreRow(key, new_val, &pending); // won't change current value break; default: cur_val = UpsertRow(key, new_val, cur_val, &pending); @@ -213,13 +227,21 @@ class TestRandomAccess : public KuduTabletTest { } // Adds an update of the given key/value pair to 'ops', returning the expected value - optional<ExpectedKeyValueRow> MutateRow(int key, + optional<ExpectedKeyValueRow> UpdateRow(int key, uint32_t new_val, const optional<ExpectedKeyValueRow>& old_row, vector<LocalTabletWriter::RowOp>* ops) { return DoRowOp(RowOperationsPB::UPDATE, key, new_val, old_row, ops); } + // Adds an update of the given key/value pair to 'ops', returning the expected value + optional<ExpectedKeyValueRow> UpdateIgnoreRow(int key, + uint32_t new_val, + const optional<ExpectedKeyValueRow>& old_row, + vector<LocalTabletWriter::RowOp>* ops) { + return DoRowOp(RowOperationsPB::UPDATE_IGNORE, key, new_val, old_row, ops); + } + optional<ExpectedKeyValueRow> DoRowOp(RowOperationsPB::Type type, int key, int val, @@ -234,6 +256,7 @@ class TestRandomAccess : public KuduTabletTest { switch (type) { case RowOperationsPB::UPSERT: case RowOperationsPB::UPDATE: + case RowOperationsPB::UPDATE_IGNORE: case RowOperationsPB::INSERT: case RowOperationsPB::INSERT_IGNORE: switch (val % 2) { @@ -247,7 +270,8 @@ class TestRandomAccess : public KuduTabletTest { break; } - if ((type != RowOperationsPB::UPDATE) && (val % 3 == 1)) { + if ((type != RowOperationsPB::UPDATE && + type != RowOperationsPB::UPDATE_IGNORE) && (val % 3 == 1)) { // Don't set the value. In the case of an INSERT or an UPSERT with no pre-existing // row, this should default to NULL. Otherwise it should remain set to whatever it // was previously set to. @@ -261,6 +285,7 @@ class TestRandomAccess : public KuduTabletTest { } break; case RowOperationsPB::DELETE: + case RowOperationsPB::DELETE_IGNORE: ret = boost::none; break; default: @@ -270,7 +295,6 @@ class TestRandomAccess : public KuduTabletTest { return ret; } - // Adds a delete of the given row to 'ops', returning an empty string (indicating that // the row no longer exists). optional<ExpectedKeyValueRow> DeleteRow(int key, vector<LocalTabletWriter::RowOp>* ops) { @@ -280,6 +304,15 @@ class TestRandomAccess : public KuduTabletTest { return boost::none; } + // Adds a delete ignore of the given row to 'ops', returning an empty string (indicating that + // the row no longer exists). + optional<ExpectedKeyValueRow> DeleteIgnoreRow(int key, vector<LocalTabletWriter::RowOp>* ops) { + unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_)); + CHECK_OK(row->SetInt32(0, key)); + ops->push_back(LocalTabletWriter::RowOp(RowOperationsPB::DELETE_IGNORE, row.release())); + return boost::none; + } + // Random-read the given row, returning its current value. // If the row doesn't exist, returns boost::none. optional<ExpectedKeyValueRow> GetRow(int key) { diff --git a/src/kudu/tserver/tablet_server_authorization-test.cc b/src/kudu/tserver/tablet_server_authorization-test.cc index ced797b..0f8ab03 100644 --- a/src/kudu/tserver/tablet_server_authorization-test.cc +++ b/src/kudu/tserver/tablet_server_authorization-test.cc @@ -1191,6 +1191,9 @@ class WritePrivilegeAuthzTest : public AuthzTabletServerTestBase { RowOperationsPB::INSERT, RowOperationsPB::UPDATE, RowOperationsPB::UPSERT, + RowOperationsPB::INSERT_IGNORE, + RowOperationsPB::UPDATE_IGNORE, + RowOperationsPB::DELETE_IGNORE, }; RowOpTypes types; types.reset(SelectRandomSubset< @@ -1226,12 +1229,24 @@ TEST_F(WritePrivilegeAuthzTest, TestSingleWriteOperations) { } { vector<WriteOpDescriptor> batch({ + { RowOperationsPB::INSERT_IGNORE, /*key=*/0, /*val=*/0 } + }); + NO_FATALS(CheckWritePrivileges(batch, WritePrivileges{ WritePrivilegeType::INSERT })); + } + { + vector<WriteOpDescriptor> batch({ { RowOperationsPB::UPDATE, /*key=*/0, /*val=*/1234 } }); NO_FATALS(CheckWritePrivileges(batch, WritePrivileges{ WritePrivilegeType::UPDATE })); } { vector<WriteOpDescriptor> batch({ + { RowOperationsPB::UPDATE_IGNORE, /*key=*/0, /*val=*/1234 } + }); + NO_FATALS(CheckWritePrivileges(batch, WritePrivileges{ WritePrivilegeType::UPDATE })); + } + { + vector<WriteOpDescriptor> batch({ { RowOperationsPB::UPSERT, /*key=*/0, /*val=*/3465 } }); NO_FATALS(CheckWritePrivileges(batch, WritePrivileges{ WritePrivilegeType::INSERT, @@ -1243,6 +1258,12 @@ TEST_F(WritePrivilegeAuthzTest, TestSingleWriteOperations) { }); NO_FATALS(CheckWritePrivileges(batch, WritePrivileges{ WritePrivilegeType::DELETE })); } + { + vector<WriteOpDescriptor> batch({ + { RowOperationsPB::DELETE_IGNORE, /*key=*/0, /*val=*/0 } + }); + NO_FATALS(CheckWritePrivileges(batch, WritePrivileges{ WritePrivilegeType::DELETE })); + } } // Like the above test, but sent in a batch.
