This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fcfe6dbb3466df0edf368f5d7257a7766f9a1dc5 Author: Andrew Wong <[email protected]> AuthorDate: Mon Apr 19 13:12:45 2021 -0700 [tests] enable using txns in TestWorkload I have an upcoming test in which it'd convenient to have an easy means to generate a transactional workload. This patch introduces some options to the TestWorkload class that satisfy this need: - set_begin_txn() - set_commit_txn() - set_rollback_txn() - set_txn_id(int64_t txn_id) Change-Id: Ia81daac8fcfd552603a0302c3d9aa411ea082ab1 Reviewed-on: http://gerrit.cloudera.org:8080/17326 Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Alexey Serbin <[email protected]> --- src/kudu/integration-tests/test_workload.cc | 57 ++++++++- src/kudu/integration-tests/test_workload.h | 56 +++++++- src/kudu/integration-tests/txn_write_ops-itest.cc | 148 +++++++++++++++++++++- 3 files changed, 251 insertions(+), 10 deletions(-) diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc index f646a70..51e089a 100644 --- a/src/kudu/integration-tests/test_workload.cc +++ b/src/kudu/integration-tests/test_workload.cc @@ -19,6 +19,7 @@ #include <memory> #include <ostream> +#include <string> #include <glog/logging.h> @@ -27,12 +28,14 @@ #include "kudu/client/schema.h" #include "kudu/client/write_op.h" #include "kudu/common/partial_row.h" +#include "kudu/common/txn_id.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/gutil/mathlimits.h" #include "kudu/gutil/port.h" #include "kudu/gutil/stl_util.h" #include "kudu/integration-tests/data_gen_util.h" #include "kudu/mini-cluster/mini_cluster.h" +#include "kudu/transactions/transactions.pb.h" #include "kudu/util/random.h" #include "kudu/util/status.h" #include "kudu/util/test_util.h" @@ -48,9 +51,12 @@ using kudu::client::KuduSchema; using kudu::client::KuduSession; using kudu::client::KuduTable; using kudu::client::KuduTableCreator; +using kudu::client::KuduTransaction; using kudu::client::KuduUpdate; using kudu::client::sp::shared_ptr; using kudu::cluster::MiniCluster; +using kudu::transactions::TxnTokenPB; +using std::string; using std::unique_ptr; using std::vector; @@ -71,6 +77,10 @@ TestWorkload::TestWorkload(MiniCluster* cluster, write_batch_size_(50), write_interval_millis_(0), write_timeout_millis_(20000), + txn_id_(TxnId::kInvalidTxnId), + begin_txn_(false), + commit_txn_(false), + rollback_txn_(false), fault_tolerant_(true), verify_num_rows_(true), read_errors_allowed_(false), @@ -141,7 +151,12 @@ void TestWorkload::WriteThread() { shared_ptr<KuduTable> table; OpenTable(&table); - shared_ptr<KuduSession> session = client_->NewSession(); + shared_ptr<KuduSession> session; + if (txn_) { + CHECK_OK(txn_->CreateSession(&session)); + } else { + session = client_->NewSession(); + } session->SetTimeoutMillis(write_timeout_millis_); CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); @@ -247,7 +262,8 @@ void TestWorkload::ReadThread() { // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify // anything except that a scan works. int64_t expected_min_rows = 0; - if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_) { + if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_ && + !begin_txn_ && !txn_id_.IsValid()) { expected_min_rows = rows_inserted_.Load(); } size_t row_count = 0; @@ -290,6 +306,12 @@ shared_ptr<KuduClient> TestWorkload::CreateClient() { } void TestWorkload::Setup() { + if (begin_txn_) { + CHECK(!txn_id_.IsValid()) << "Cannot begin txn and supply txn ID at the same time"; + } + if (commit_txn_ || rollback_txn_) { + CHECK(txn_id_.IsValid() || begin_txn_) << "Must participate in a txn to commit or abort"; + } if (!client_) { CHECK_OK(cluster_->CreateClient(&client_builder_, &client_)); } @@ -396,6 +418,29 @@ void TestWorkload::Start() { CHECK(!should_run_.Load()) << "Already started"; should_run_.Store(true); start_latch_.Reset(num_write_threads_ + num_read_threads_); + if (txn_id_.IsValid()) { + // TODO(awong): add an API to set the keepalive. For now just use an + // arbitrary, short default value. + CHECK(!txn_); + TxnTokenPB txn_token_pb; + txn_token_pb.set_txn_id(txn_id_.value()); + txn_token_pb.set_enable_keepalive(true); + txn_token_pb.set_keepalive_millis(1000); + string txn_token_str; + CHECK(txn_token_pb.SerializeToString(&txn_token_str)); + CHECK_OK(KuduTransaction::Deserialize(client_, txn_token_str, &txn_)); + } + if (begin_txn_) { + CHECK(!txn_); + CHECK(!txn_id_.IsValid()); + CHECK_OK(client_->NewTransaction(&txn_)); + string txn_str; + CHECK_OK(txn_->Serialize(&txn_str)); + TxnTokenPB txn_token_pb; + CHECK(txn_token_pb.ParseFromString(txn_str)); + CHECK(txn_token_pb.has_txn_id()); + txn_id_ = TxnId(txn_token_pb.txn_id()); + } for (int i = 0; i < num_write_threads_; i++) { threads_.emplace_back(&TestWorkload::WriteThread, this); } @@ -422,6 +467,14 @@ void TestWorkload::StopAndJoin() { t.join(); } threads_.clear(); + if (txn_) { + if (commit_txn_) { + CHECK_OK(txn_->Commit()); + } + if (rollback_txn_) { + CHECK_OK(txn_->Rollback()); + } + } } } // namespace kudu diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h index fb86b76..57ef175 100644 --- a/src/kudu/integration-tests/test_workload.h +++ b/src/kudu/integration-tests/test_workload.h @@ -31,6 +31,7 @@ #include "kudu/client/client.h" #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep +#include "kudu/common/txn_id.h" #include "kudu/gutil/macros.h" #include "kudu/util/atomic.h" #include "kudu/util/countdown_latch.h" @@ -77,6 +78,44 @@ class TestWorkload { PartitioningType partitioning = PartitioningType::RANGE); ~TestWorkload(); + // Ingest the workload as part of the given transaction. set_begin_txn() must + // not be called if this is set. + void set_txn_id(int64_t txn_id) { + txn_id_ = TxnId(txn_id); + CHECK(txn_id_.IsValid()); + } + + // Ingest the workload as a part of a new transaction. set_txn_id() must not + // be called if this is set. + void set_begin_txn() { + begin_txn_ = true; + CHECK(!txn_id_.IsValid()); + } + + // Commit the transaction that this workload is a part of upon calling + // StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set + // as well. If not set, but either set_begin_txn() or set_txn_id() is set, + // the workload will ingest as a part of the transaction, but not call + // commit on completion. + // + // set_rollback_txn() must not be called if this is set. + void set_commit_txn() { + commit_txn_ = true; + CHECK(!rollback_txn_); + } + + // Abort the transaction that this workload is a part of upon calling + // StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set + // as well. If not set, but either set_begin_txn() or set_txn_id() is set, + // the workload will ingest as a part of the transaction, but not call abort + // on completion. + // + // set_commit_txn() must not be called if this is set. + void set_rollback_txn() { + rollback_txn_ = true; + CHECK(!commit_txn_); + } + // Sets whether the read thread should crash if scanning to the cluster fails // for whatever reason. If set to true, errors will be populated in // 'read_errors_'. @@ -152,6 +191,9 @@ class TestWorkload { // Set whether we should attempt to verify the number of rows when scanning. // An incorrect number of rows may be indicative of a stale read. + // + // If either set_begin_txn() or set_txn_id() has been called, does not verify + // the number of rows. void set_verify_num_rows(bool should_verify) { verify_num_rows_ = should_verify; } @@ -250,8 +292,14 @@ class TestWorkload { // Delete created table, etc. Status Cleanup(); + int64_t txn_id() const { + CHECK(txn_id_.IsValid()); + return txn_id_.value(); + } + // Return the number of rows inserted so far. This may be called either - // during or after the write workload. + // during or after the write workload. If writing as a part of a transaction, + // these rows may have not been committed. int64_t rows_inserted() const { return rows_inserted_.Load(); } @@ -298,6 +346,10 @@ class TestWorkload { int write_batch_size_; int write_interval_millis_; int write_timeout_millis_; + TxnId txn_id_; + bool begin_txn_; + bool commit_txn_; + bool rollback_txn_; bool fault_tolerant_; bool verify_num_rows_; bool read_errors_allowed_; @@ -321,6 +373,8 @@ class TestWorkload { AtomicInt<int64_t> batches_completed_; AtomicInt<int32_t> sequential_key_gen_; + client::sp::shared_ptr<client::KuduTransaction> txn_; + std::vector<std::thread> threads_; mutable simple_spinlock read_error_lock_; diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc index b53c796..1aabbc1 100644 --- a/src/kudu/integration-tests/txn_write_ops-itest.cc +++ b/src/kudu/integration-tests/txn_write_ops-itest.cc @@ -60,6 +60,7 @@ #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" +#include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/external_mini_cluster.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/rpc/rpc_controller.h" @@ -201,6 +202,12 @@ Status CountRows(KuduTable* table, size_t* num_rows) { return Status::OK(); } +Status CountRows(KuduClient* client, const string& table_name, size_t* num_rows) { + shared_ptr<KuduTable> table; + RETURN_NOT_OK(client->OpenTable(table_name, &table)); + return CountRows(table.get(), num_rows); +} + Status GetSingleRowError(KuduSession* session) { vector<KuduError*> errors; ElementDeleter drop(&errors); @@ -932,7 +939,7 @@ class TxnOpDispatcherITest : public KuduTest { CHECK_OK(BuildSchema(&schema_)); } - void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) { + void SetupCluster(int num_tservers, int num_replicas = 0) { if (num_replicas == 0) { num_replicas = num_tservers; } @@ -944,7 +951,13 @@ class TxnOpDispatcherITest : public KuduTest { opts.num_tablet_servers = num_tservers; cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); ASSERT_OK(cluster_->StartSync()); + } + void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) { + if (num_replicas == 0) { + num_replicas = num_tservers; + } + NO_FATALS(SetupCluster(num_tservers, num_replicas)); KuduClientBuilder builder; builder.default_admin_operation_timeout(kTimeout); ASSERT_OK(cluster_->CreateClient(&builder, &client_)); @@ -992,14 +1005,15 @@ class TxnOpDispatcherITest : public KuduTest { } // Get all replicas of the test table. - vector<scoped_refptr<TabletReplica>> GetAllReplicas() const { + vector<scoped_refptr<TabletReplica>> GetAllReplicas(const string& table_name = "") const { + const string& target_table = table_name.empty() ? kTableName : table_name; vector<scoped_refptr<TabletReplica>> result; for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { auto* server = cluster_->mini_tablet_server(i)->server(); vector<scoped_refptr<TabletReplica>> replicas; server->tablet_manager()->GetTabletReplicas(&replicas); for (auto& r : replicas) { - if (r->tablet()->metadata()->table_name() == kTableName) { + if (r->tablet()->metadata()->table_name() == target_table) { result.emplace_back(std::move(r)); } } @@ -1008,10 +1022,11 @@ class TxnOpDispatcherITest : public KuduTest { } size_t GetTxnOpDispatchersTotalCount( - vector<scoped_refptr<TabletReplica>> replicas = {}) { + vector<scoped_refptr<TabletReplica>> replicas = {}, + const string& table_name = "") { if (replicas.empty()) { // No replicas were specified, get the list of all test table's replicas. - replicas = GetAllReplicas(); + replicas = GetAllReplicas(table_name); } size_t elem_count = 0; for (auto& r : replicas) { @@ -1041,8 +1056,8 @@ class TxnOpDispatcherITest : public KuduTest { typedef vector<std::shared_ptr<typename TabletReplica::TxnOpDispatcher>> OpDispatchers; typedef map<int64_t, OpDispatchers> OpDispatchersPerTxnId; - OpDispatchersPerTxnId GetTxnOpDispatchers() { - auto replicas = GetAllReplicas(); + OpDispatchersPerTxnId GetTxnOpDispatchers(const string& table_name = "") { + auto replicas = GetAllReplicas(table_name); OpDispatchersPerTxnId result; for (auto& r : replicas) { std::lock_guard<simple_spinlock> guard(r->txn_op_dispatchers_lock_); @@ -2191,4 +2206,123 @@ TEST_F(TxnOpDispatcherITest, DISABLED_TxnMultipleSingleRowsWithServerRestart) { } } +// Test beginning and aborting a transaction from the same test workload. +TEST_F(TxnOpDispatcherITest, TestBeginAbortTransactionalTestWorkload) { + NO_FATALS(SetupCluster(1)); + TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH); + w.set_num_replicas(1); + w.set_num_tablets(3); + w.set_begin_txn(); + w.set_rollback_txn(); + w.Setup(); + w.Start(); + const auto& table_name = w.table_name(); + while (w.rows_inserted() < 1000) { + SleepFor(MonoDelta::FromMilliseconds(5)); + } + // Each participant should have a dispatcher. + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, table_name)); + }); + w.StopAndJoin(); + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, table_name)); + }); + // By the end of it, we should have aborted the rows and they should not be + // visible to clients. + size_t num_rows; + ASSERT_OK(CountRows(w.client().get(), table_name, &num_rows)); + ASSERT_EQ(0, num_rows); +} + +// Test beginning and committing a transaction from the same test workload. +TEST_F(TxnOpDispatcherITest, TestBeginCommitTransactionalTestWorkload) { + NO_FATALS(SetupCluster(1)); + TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH); + w.set_num_replicas(1); + w.set_num_tablets(3); + w.set_begin_txn(); + w.set_commit_txn(); + w.Setup(); + w.Start(); + const auto& table_name = w.table_name(); + while (w.rows_inserted() < 1000) { + SleepFor(MonoDelta::FromMilliseconds(5)); + } + // Each participant should have a dispatcher. + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, table_name)); + }); + w.StopAndJoin(); + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, table_name)); + }); + // By the end of it, we should have committed the rows and they should be + // visible to clients. + size_t num_rows; + ASSERT_OK(CountRows(w.client().get(), table_name, &num_rows)); + ASSERT_EQ(w.rows_inserted(), num_rows); +} + +// Test beginning and committing a transaction from separate test workloads. +TEST_F(TxnOpDispatcherITest, TestSeparateBeginCommitTestWorkloads) { + NO_FATALS(SetupCluster(1)); + int64_t txn_id; + string first_table_name; + size_t first_rows_inserted; + { + TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH); + w.set_begin_txn(); + w.set_num_replicas(1); + w.set_num_tablets(3); + w.Setup(); + w.Start(); + while (w.rows_inserted() < 1000) { + SleepFor(MonoDelta::FromMilliseconds(5)); + } + first_table_name = w.table_name(); + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, first_table_name)); + }); + w.StopAndJoin(); + first_rows_inserted = w.rows_inserted(); + txn_id = w.txn_id(); + size_t num_rows; + ASSERT_OK(CountRows(w.client().get(), first_table_name, &num_rows)); + ASSERT_EQ(0, num_rows); + } + // Create a new workload, and insert as a part of the same transaction. + { + TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH); + const auto& kSecondTableName = "default.second_table"; + w.set_txn_id(txn_id); + w.set_commit_txn(); + w.set_table_name(kSecondTableName); + w.set_num_replicas(1); + w.set_num_tablets(3); + w.Setup(); + w.Start(); + while (w.rows_inserted() < 1000) { + SleepFor(MonoDelta::FromMilliseconds(5)); + } + // We should have dispatchers for both tables. + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, first_table_name)); + ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, kSecondTableName)); + }); + w.StopAndJoin(); + // Once committed, the dispatchers should be unregistered. + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, first_table_name)); + ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, kSecondTableName)); + }); + size_t num_rows; + ASSERT_OK(CountRows(w.client().get(), first_table_name, &num_rows)); + ASSERT_EQ(first_rows_inserted, num_rows); + ASSERT_OK(CountRows(w.client().get(), kSecondTableName, &num_rows)); + ASSERT_EQ(w.rows_inserted(), num_rows); + } +} + + } // namespace kudu
