This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit efd8c4f165460b7fa337b8ebd1856b10bc274311 Author: Andrew Wong <[email protected]> AuthorDate: Sat Jun 13 22:31:42 2020 -0400 KUDU-2612 p2: introduce transaction status management This introduces the TxnStatusManager, which is backed by the TxnStatusTablet that exposes the following APIs that will be called via RPC, and will serve as many of the building blocks for orchestrating two-phase commit: - BeginTransaction: adds a new transaction under management of the TxnStatusManager - BeginCommitTransaction: transitions the state of a transaction from OPEN to COMMIT_IN_PROGRESS - AbortTransaction: transitions the state of a transaction from OPEN or COMMIT_IN_PROGRESS to ABORTED - RegisterParticipant: adds a participant to be associated with a specific transaction ID For completeness sake w.r.t defining the transaction state's enums, the following API is also added, which will be called by the TxnStatusManager itself upon determining a transaction has been completed. - FinalizeCommitTransaction: transitions the state of a transaction from COMMIT_IN_PROGRESS to COMMITTED This new abstraction mirrors that used by the CatalogManager, which uses copy-on-write locking to protect concurrent access to metadata while writes to the underlying TabletReplica (i.e. SysCatalogTable, or in this case, TxnStatusTablet) are being replicated. This is at least enough of a jumping off point that we can begin plumbing this into the tablet servers and defining an RPC service around it -- there are still no facilities to create a TxnStatusManager. It should be noted that end-users will not call these methods directly, but rather through some layer of indirection (e.g. clients won't request a specific transaction ID, they'll just request to begin a transaction, and some intermediary layer will be in charge of getting an appropriate transaction ID). This should give us flexibility in changing the TxnStatusManager's interface moving forward. Change-Id: I371bb200cf65073ae3ac7cb311ab9a0b8344a636 Reviewed-on: http://gerrit.cloudera.org:8080/16044 Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Andrew Wong <[email protected]> --- src/kudu/master/catalog_manager.h | 16 - src/kudu/transactions/CMakeLists.txt | 3 + src/kudu/transactions/txn_status_entry.cc | 58 +++ src/kudu/transactions/txn_status_entry.h | 116 ++++++ src/kudu/transactions/txn_status_manager-test.cc | 443 +++++++++++++++++++++++ src/kudu/transactions/txn_status_manager.cc | 262 ++++++++++++++ src/kudu/transactions/txn_status_manager.h | 146 ++++++++ src/kudu/util/cow_object.h | 47 +++ 8 files changed, 1075 insertions(+), 16 deletions(-) diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index 6eca87f..31cda7d 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -398,22 +398,6 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> { DISALLOW_COPY_AND_ASSIGN(TableInfo); }; -// Helper to manage locking on the persistent metadata of TabletInfo or TableInfo. -template<class MetadataClass> -class MetadataLock : public CowLock<typename MetadataClass::cow_state> { - public: - typedef CowLock<typename MetadataClass::cow_state> super; - MetadataLock() - : super() { - } - MetadataLock(MetadataClass* info, LockMode mode) - : super(DCHECK_NOTNULL(info)->mutable_metadata(), mode) { - } - MetadataLock(const MetadataClass* info, LockMode mode) - : super(&(DCHECK_NOTNULL(info))->metadata(), mode) { - } -}; - // Helper to manage locking on the persistent metadata of multiple TabletInfo // or TableInfo objects. template<class MetadataClass> diff --git a/src/kudu/transactions/CMakeLists.txt b/src/kudu/transactions/CMakeLists.txt index 4f35408..c5b3e89 100644 --- a/src/kudu/transactions/CMakeLists.txt +++ b/src/kudu/transactions/CMakeLists.txt @@ -30,6 +30,8 @@ target_link_libraries(transactions_proto ) set(TRANSACTIONS_SRCS + txn_status_entry.cc + txn_status_manager.cc txn_status_tablet.cc) add_library(transactions ${TRANSACTIONS_SRCS}) @@ -42,4 +44,5 @@ target_link_libraries(transactions ) SET_KUDU_TEST_LINK_LIBS(transactions tablet_test_util) +ADD_KUDU_TEST(txn_status_manager-test) ADD_KUDU_TEST(txn_status_tablet-test) diff --git a/src/kudu/transactions/txn_status_entry.cc b/src/kudu/transactions/txn_status_entry.cc new file mode 100644 index 0000000..ed56c19 --- /dev/null +++ b/src/kudu/transactions/txn_status_entry.cc @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/transactions/txn_status_entry.h" + +#include <mutex> +#include <string> + +#include <glog/logging.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" + +using std::string; +using std::vector; + +namespace kudu { +namespace transactions { + +scoped_refptr<ParticipantEntry> TransactionEntry::GetOrCreateParticipant( + const string& tablet_id) { + DCHECK(metadata_.IsReadLocked()); + DCHECK_EQ(TxnStatePB::OPEN, metadata_.state().pb.state()); + + // In the expected case, this participant hasn't been added; add it. + std::lock_guard<simple_spinlock> l(lock_); + scoped_refptr<ParticipantEntry> participant = FindPtrOrNull(participants_, tablet_id); + if (PREDICT_TRUE(!participant)) { + participant = new ParticipantEntry(); + EmplaceOrDie(&participants_, tablet_id, participant); + } + return participant; +} + +vector<string> TransactionEntry::GetParticipantIds() const { + std::lock_guard<simple_spinlock> l(lock_); + vector<string> ret; + AppendKeysFromMap(participants_, &ret); + return ret; +} + +} // namespace transactions +} // namespace kudu diff --git a/src/kudu/transactions/txn_status_entry.h b/src/kudu/transactions/txn_status_entry.h new file mode 100644 index 0000000..69c42ad --- /dev/null +++ b/src/kudu/transactions/txn_status_entry.h @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/transactions/transactions.pb.h" +#include "kudu/util/cow_object.h" +#include "kudu/util/locks.h" + +namespace kudu { +namespace transactions { + +typedef std::pair<std::string, TxnParticipantEntryPB> ParticipantIdAndPB; + +// Representations of entries in the transaction status table. Currently there +// are two entry types: +// - Transaction Entries: these indicate the existence of a transaction, and +// encapsulate encapsulate metadata that pertain to the entire transactions +// (e.g. owner, commit status, commit timestamp). +// - Participant Entries: these indicate the existence of a tablet that is +// participating in a transaction. +// +// There is a 1:N relationship between transaction entries and participant +// entries. Represents the metadata persisted with a participant entry in the +// transaction status table. +struct PersistentParticipantEntry { + TxnParticipantEntryPB pb; +}; +class ParticipantEntry : public RefCountedThreadSafe<ParticipantEntry> { + public: + typedef PersistentParticipantEntry cow_state; + + ParticipantEntry() {} + const CowObject<PersistentParticipantEntry>& metadata() const { return metadata_; } + CowObject<PersistentParticipantEntry>* mutable_metadata() { return &metadata_; } + + private: + friend class RefCountedThreadSafe<ParticipantEntry>; + ~ParticipantEntry() {} + + // Mutable state for this participant with concurrent access controlled via + // copy-on-write locking. + CowObject<PersistentParticipantEntry> metadata_; +}; + +// Represents the metadata persisted with a status entry in the transaction +// status table. +struct PersistentTransactionEntry { + TxnStatusEntryPB pb; +}; +class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> { + public: + typedef PersistentTransactionEntry cow_state; + + TransactionEntry(int64_t txn_id, std::string user) + : txn_id_(txn_id), + user_(std::move(user)) {} + const CowObject<PersistentTransactionEntry>& metadata() const { return metadata_; } + CowObject<PersistentTransactionEntry>* mutable_metadata() { return &metadata_; } + + // Adds a participant with the given tablet ID, or returns the one if it + // already exists. + scoped_refptr<ParticipantEntry> GetOrCreateParticipant(const std::string& tablet_id); + + // Returns the list of tablet IDs associated with this transaction. + std::vector<std::string> GetParticipantIds() const; + + const std::string& user() const { + return user_; + } + + private: + friend class RefCountedThreadSafe<TransactionEntry>; + ~TransactionEntry() {} + + const int64_t txn_id_; + + // While this is redundant with the field in the protobuf, it's convenient to + // cache this so we don't have to lock this entry to get the user. + const std::string user_; + + // Protects participants_. If adding a new participant, the entry should also + // be locked in read mode and the transaction should be open. + mutable simple_spinlock lock_; + std::unordered_map<std::string, scoped_refptr<ParticipantEntry>> participants_; + + // Mutable state for the transaction status record with concurrent access + // controlled via copy-on-write locking. + CowObject<PersistentTransactionEntry> metadata_; +}; + +typedef MetadataLock<TransactionEntry> TransactionEntryLock; +typedef MetadataLock<ParticipantEntry> ParticipantEntryLock; + +} // namespace transactions +} // namespace kudu diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc new file mode 100644 index 0000000..c912e9c --- /dev/null +++ b/src/kudu/transactions/txn_status_manager-test.cc @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/transactions/txn_status_manager.h" + +#include <algorithm> +#include <cstdint> +#include <map> +#include <memory> +#include <mutex> +#include <numeric> +#include <set> +#include <string> +#include <thread> +#include <unordered_set> +#include <utility> +#include <vector> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/consensus/raft_consensus.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/tablet/tablet-test-util.h" +#include "kudu/tablet/tablet_replica-test-base.h" +#include "kudu/transactions/transactions.pb.h" +#include "kudu/transactions/txn_status_tablet.h" +#include "kudu/util/barrier.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/locks.h" +#include "kudu/util/metrics.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using kudu::consensus::ConsensusBootstrapInfo; +using kudu::tablet::TabletReplicaTestBase; +using std::string; +using std::thread; +using std::unique_ptr; +using std::unordered_set; +using std::vector; + +METRIC_DECLARE_entity(tablet); + +namespace kudu { +namespace transactions { + +namespace { +const char* kOwner = "gru"; +const char* kParticipant = "minion"; +string ParticipantId(int i) { + return Substitute("$0$1", kParticipant, i); +} +} // anonymous namespace + +class TxnStatusManagerTest : public TabletReplicaTestBase { + public: + TxnStatusManagerTest() + : TabletReplicaTestBase(TxnStatusTablet::GetSchemaWithoutIds()) {} + + void SetUp() override { + NO_FATALS(TabletReplicaTestBase::SetUp()); + ConsensusBootstrapInfo info; + ASSERT_OK(StartReplicaAndWaitUntilLeader(info)); + txn_manager_.reset(new TxnStatusManager(tablet_replica_.get())); + } + protected: + unique_ptr<TxnStatusManager> txn_manager_; +}; + +// Test our ability to start transactions and register participants, with some +// corner cases. +TEST_F(TxnStatusManagerTest, TestStartTransactions) { + const string kParticipant1 = ParticipantId(1); + const string kParticipant2 = ParticipantId(2); + const ParticipantIdsByTxnId expected_prts_by_txn_id = { + { 1, {} }, + { 3, { kParticipant1, kParticipant2 } }, + }; + + ASSERT_TRUE(txn_manager_->GetParticipantsByTxnIdForTests().empty()); + + for (const auto& txn_id_and_prts : expected_prts_by_txn_id) { + const auto& txn_id = txn_id_and_prts.first; + ASSERT_OK(txn_manager_->BeginTransaction(txn_id, kOwner)); + for (const auto& prt : txn_id_and_prts.second) { + ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner)); + } + } + // Registering a participant that's already open is harmless, presuming the + // participant is still open. + ASSERT_OK(txn_manager_->RegisterParticipant(3, kParticipant1, kOwner)); + + // Starting a transaction that's already been started should result in an + // error, even if it's not currently in flight. + Status s = txn_manager_->BeginTransaction(1, kOwner); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + s = txn_manager_->BeginTransaction(2, kOwner); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + + // Registering participants to transactions that don't exist should also + // result in errors. + s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner); + ASSERT_TRUE(s.IsNotFound()) << s.ToString(); + + // The underlying participants map should only reflect the successful + // operations. + ASSERT_EQ(expected_prts_by_txn_id, + txn_manager_->GetParticipantsByTxnIdForTests()); + ASSERT_EQ(3, txn_manager_->highest_txn_id()); + { + // Reload the TxnStatusManager from disk and verify the state. + TxnStatusManager txn_manager_reloaded(tablet_replica_.get()); + ASSERT_OK(txn_manager_reloaded.LoadFromTablet()); + ASSERT_EQ(expected_prts_by_txn_id, + txn_manager_reloaded.GetParticipantsByTxnIdForTests()); + ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id()); + } + + // Now rebuild the underlying replica and rebuild the TxnStatusManager. + ASSERT_OK(RestartReplica()); + { + TxnStatusManager txn_manager_reloaded(tablet_replica_.get()); + ASSERT_OK(txn_manager_reloaded.LoadFromTablet()); + ASSERT_EQ(expected_prts_by_txn_id, + txn_manager_reloaded.GetParticipantsByTxnIdForTests()); + ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id()); + } +} + +TEST_F(TxnStatusManagerTest, TestStartTransactionsConcurrently) { + simple_spinlock lock; + const int kParallelTxnsPerBatch = 10; + const int kBatchesToStart = 10; + vector<int64_t> successful_txn_ids; + successful_txn_ids.reserve(kParallelTxnsPerBatch * kBatchesToStart); + + // Put together the batches of transaction IDs we're going to start. + vector<vector<int64_t>> txns_to_insert; + for (int i = 0; i < kBatchesToStart; i++) { + vector<int64_t> txns_in_batch(kParallelTxnsPerBatch); + std::iota(txns_in_batch.begin(), txns_in_batch.end(), i * kParallelTxnsPerBatch); + std::random_shuffle(txns_in_batch.begin(), txns_in_batch.end()); + txns_to_insert.emplace_back(std::move(txns_in_batch)); + } + + // From multiple threads, begin txns and record any that return with a + // success. + vector<thread> threads; + vector<std::unique_ptr<Barrier>> barriers; + threads.reserve(kParallelTxnsPerBatch); + barriers.reserve(kBatchesToStart); + for (int b = 0; b < kBatchesToStart; b++) { + // NOTE: we allocate these on the heap since we disallow assignment of + // barriers. + barriers.emplace_back(new Barrier(kParallelTxnsPerBatch)); + } + for (int i = 0; i < kParallelTxnsPerBatch; i++) { + threads.emplace_back([&, i] { + for (int b = 0; b < kBatchesToStart; b++) { + // Synchronize the threads so we're inserting to a single range at a + // time. + barriers[b]->Wait(); + auto txn_id = txns_to_insert[b][i]; + Status s = txn_manager_->BeginTransaction(txn_id, kOwner); + if (s.ok()) { + std::lock_guard<simple_spinlock> l(lock); + successful_txn_ids.emplace_back(txn_id); + } + } + }); + } + for (auto& t : threads) { + t.join(); + } + + // Verify that only txns that returned success ended up in the + // TxnStatusManager + ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests(); + EXPECT_EQ(successful_txn_ids.size(), prts_by_txn_id.size()); + for (const auto& txn_id : successful_txn_ids) { + EXPECT_TRUE(ContainsKey(prts_by_txn_id, txn_id)); + } + // As a sanity check, there should have been at least one success per batch, + // though there may have been multiple failures if the threads raced for the + // highest transaction ID. + ASSERT_GE(successful_txn_ids.size(), kBatchesToStart); +} + +TEST_F(TxnStatusManagerTest, TestRegisterParticipantsConcurrently) { + const int kParticipantsInParallel = 10; + const int kUniqueParticipantIds = 5; + simple_spinlock lock; + vector<string> successful_participants; + successful_participants.reserve(kParticipantsInParallel); + + const int64_t kTxnId = 1; + vector<thread> threads; + CountDownLatch begun_txn(1); + threads.reserve(1 + kParticipantsInParallel); + threads.emplace_back([&] { + CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner)); + begun_txn.CountDown(); + }); + + // Register a bunch of participants in parallel, including some duplicates, + // keeping track of the ones that yielded success. + for (int i = 0; i < kParticipantsInParallel; i++) { + threads.emplace_back([&, i] { + if (i % 2) { + // In some threads, wait for the transaction to have begun, to ensure + // at least some of the participant registrations succeed. + begun_txn.Wait(); + } + string prt = ParticipantId(i % kUniqueParticipantIds); + Status s = txn_manager_->RegisterParticipant(kTxnId, prt, kOwner); + if (s.ok()) { + std::lock_guard<simple_spinlock> l(lock); + successful_participants.emplace_back(std::move(prt)); + } + }); + } + for (auto& t : threads) { + t.join(); + } + + // Verify that only participant registrations that returned success ended up + // in the TxnStatusManager. + ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests(); + ASSERT_EQ(1, prts_by_txn_id.size()); + const auto& txn_id_and_prts = *prts_by_txn_id.begin(); + ASSERT_EQ(kTxnId, txn_id_and_prts.first); + + const auto& participants = txn_id_and_prts.second; + unordered_set<string> successful_prts( + successful_participants.begin(), successful_participants.end()); + EXPECT_EQ(successful_prts.size(), participants.size()); + + for (const auto& prt : participants) { + EXPECT_TRUE(ContainsKey(successful_prts, prt)); + } + ASSERT_GT(successful_prts.size(), 0); +} + +TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) { + const int kNumTransactions = 10; + const int kNumUpdatesInParallel = 20; + for (int i = 0; i < kNumTransactions; i++) { + ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner)); + } + typedef std::pair<int64_t, TxnStatePB> IdAndUpdate; + vector<IdAndUpdate> all_updates; + for (int i = 0; i < kNumTransactions; i++) { + all_updates.emplace_back(std::make_pair(i, TxnStatePB::ABORTED)); + all_updates.emplace_back(std::make_pair(i, TxnStatePB::COMMIT_IN_PROGRESS)); + all_updates.emplace_back(std::make_pair(i, TxnStatePB::COMMITTED)); + } + ThreadSafeRandom rng(SeedRandom()); + vector<IdAndUpdate> updates; + ReservoirSample(all_updates, kNumUpdatesInParallel, std::set<IdAndUpdate>(), &rng, &updates); + vector<Status> statuses(kNumUpdatesInParallel); + vector<thread> threads; + threads.reserve(kNumUpdatesInParallel); + // Start a bunch of threads that update transaction states. + for (int i = 0; i < kNumUpdatesInParallel; i++) { + threads.emplace_back([&, i] { + const auto& txn_id = updates[i].first; + switch (updates[i].second) { + case TxnStatePB::ABORTED: + statuses[i] = txn_manager_->AbortTransaction(txn_id, kOwner); + break; + case TxnStatePB::COMMIT_IN_PROGRESS: + statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner); + break; + case TxnStatePB::COMMITTED: + statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id); + break; + default: + FAIL() << "bad update"; + } + }); + } + for (auto& t : threads) { + t.join(); + } + + // Collect the transaction IDs per successful update. + unordered_set<int64_t> txns_with_abort; + unordered_set<int64_t> txns_with_begin_commit; + unordered_set<int64_t> txns_with_finalize_commit; + for (int i = 0; i < kNumUpdatesInParallel; i++) { + const auto& txn_id = updates[i].first; + if (!statuses[i].ok()) { + continue; + } + switch (updates[i].second) { + case TxnStatePB::ABORTED: + EmplaceIfNotPresent(&txns_with_abort, txn_id); + break; + case TxnStatePB::COMMIT_IN_PROGRESS: + EmplaceIfNotPresent(&txns_with_begin_commit, txn_id); + break; + case TxnStatePB::COMMITTED: + EmplaceIfNotPresent(&txns_with_finalize_commit, txn_id); + break; + default: + FAIL() << "bad update"; + } + } + for (int i = 0; i < kNumTransactions; i++) { + // If there's a finalize commit and an abort commit, only one can succeed. + if (ContainsKey(txns_with_abort, i)) { + ASSERT_FALSE(ContainsKey(txns_with_finalize_commit, i)); + } + // If there's a finalize commit, it can only succeed if there's also been a + // successful request to begin the commit. + if (ContainsKey(txns_with_finalize_commit, i)) { + ASSERT_TRUE(ContainsKey(txns_with_begin_commit, i)); + ASSERT_FALSE(ContainsKey(txns_with_abort, i)); + } + } +} + +// Test that performing actions as the wrong user will return errors. +TEST_F(TxnStatusManagerTest, TestWrongUser) { + const string kWrongUser = "stranger"; + ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner)); + ASSERT_OK(txn_manager_->RegisterParticipant(1, ParticipantId(1), kOwner)); + + // First, any other call to begin the transaction should be rejected, + // regardless of user. + Status s = txn_manager_->BeginTransaction(1, kWrongUser); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + + // All actions should be rejected if performed by the wrong user. + s = txn_manager_->RegisterParticipant(1, ParticipantId(1), kWrongUser); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + s = txn_manager_->RegisterParticipant(1, ParticipantId(2), kWrongUser); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + s = txn_manager_->BeginCommitTransaction(1, kWrongUser); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + s = txn_manager_->AbortTransaction(1, kWrongUser); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests(); + ParticipantIdsByTxnId kExpectedPrtsByTxnId = { { 1, { ParticipantId(1) } } }; + ASSERT_EQ(kExpectedPrtsByTxnId, prts_by_txn_id); +} + +// Test that we can only update a transaction's state when it's in an +// appropriate state. +TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) { + const int64_t kTxnId1 = 1; + ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner)); + + // Redundant calls are benign. + ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner)); + ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner)); + ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner)); + ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner)); + + // We can't begin or finalize a commit if we've aborted. + Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); + s = txn_manager_->FinalizeCommitTransaction(kTxnId1); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); + + // We can't finalize a commit that hasn't begun committing. + const int64_t kTxnId2 = 2; + ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner)); + s = txn_manager_->FinalizeCommitTransaction(kTxnId2); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); + + // We can't abort a transaction that has finished committing. + ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner)); + ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2)); + s = txn_manager_->AbortTransaction(kTxnId2, kOwner); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); + + // Redundant finalize calls are also benign. + ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2)); + + // Calls to begin committing should return an error if we've already + // finalized the commit. + s = txn_manager_->BeginCommitTransaction(kTxnId2, kOwner); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); +} + +// Test that we can only add participants to a transaction when it's in an +// appropriate state. +TEST_F(TxnStatusManagerTest, TestRegisterParticipantsWithStates) { + const int64_t kTxnId1 = 1; + + // We can't register a participant to a transaction that hasn't started. + Status s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner); + ASSERT_TRUE(s.IsNotFound()) << s.ToString(); + + ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner)); + ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner)); + + // Registering the same participant is idempotent and benign. + ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner)); + + // We can't register participants when we've already begun committing. + ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner)); + s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); + + // We can't register participants when we've finished committnig. + ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1)); + s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); + + // We can't register participants when we've aborted the transaction. + const int64_t kTxnId2 = 2; + ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner)); + ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(1), kOwner)); + ASSERT_OK(txn_manager_->AbortTransaction(kTxnId2, kOwner)); + s = txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(2), kOwner); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); +} + +} // namespace transactions +} // namespace kudu + diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc new file mode 100644 index 0000000..a7d85ed --- /dev/null +++ b/src/kudu/transactions/txn_status_manager.cc @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/transactions/txn_status_manager.h" + +#include <algorithm> +#include <mutex> +#include <string> +#include <utility> +#include <vector> + +#include <boost/optional/optional.hpp> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/transactions/transactions.pb.h" +#include "kudu/util/cow_object.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/status.h" + +using kudu::pb_util::SecureShortDebugString; +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace transactions { + +void TxnStatusManagerBuildingVisitor::VisitTransactionEntries( + int64_t txn_id, TxnStatusEntryPB status_entry_pb, + vector<ParticipantIdAndPB> participants) { + scoped_refptr<TransactionEntry> txn = new TransactionEntry(txn_id, status_entry_pb.user()); + { + TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE); + txn_lock.mutable_data()->pb = std::move(status_entry_pb); + txn_lock.Commit(); + } + { + // Lock the transaction while we build the participants. + TransactionEntryLock txn_lock(txn.get(), LockMode::READ); + for (auto& participant_and_state : participants) { + const auto& prt_id = participant_and_state.first; + auto& prt_entry_pb = participant_and_state.second; + + // Register a participant entry for this transaction. + auto prt = txn->GetOrCreateParticipant(prt_id); + ParticipantEntryLock l(prt.get(), LockMode::WRITE); + l.mutable_data()->pb = std::move(prt_entry_pb); + l.Commit(); + } + } + // NOTE: this method isn't meant to be thread-safe, hence the lack of + // locking. + EmplaceOrDie(&txns_by_id_, txn_id, std::move(txn)); + highest_txn_id_ = std::max(highest_txn_id_, txn_id); +} + +void TxnStatusManagerBuildingVisitor::Release( + int64_t* highest_txn_id, TransactionsMap* txns_by_id) { + *highest_txn_id = highest_txn_id_; + *txns_by_id = std::move(txns_by_id_); +} + +Status TxnStatusManager::LoadFromTablet() { + TxnStatusManagerBuildingVisitor v; + RETURN_NOT_OK(status_tablet_.VisitTransactions(&v)); + int64_t highest_txn_id; + TransactionsMap txns_by_id; + v.Release(&highest_txn_id, &txns_by_id); + + std::lock_guard<simple_spinlock> l(lock_); + highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_); + txns_by_id_ = std::move(txns_by_id); + return Status::OK(); +} + +Status TxnStatusManager::GetTransaction(int64_t txn_id, + const boost::optional<string>& user, + scoped_refptr<TransactionEntry>* txn) const { + std::lock_guard<simple_spinlock> l(lock_); + scoped_refptr<TransactionEntry> ret = FindPtrOrNull(txns_by_id_, txn_id); + if (PREDICT_FALSE(!ret)) { + return Status::NotFound( + Substitute("transaction ID $0 not found, current highest txn ID: $1", + txn_id, highest_txn_id_)); + } + if (PREDICT_FALSE(user && ret->user() != *user)) { + return Status::NotAuthorized( + Substitute("transaction ID $0 not owned by $1", txn_id, *user)); + } + *txn = std::move(ret); + return Status::OK(); +} + +Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user) { + { + // First, make sure the requested ID is viable. + std::lock_guard<simple_spinlock> l(lock_); + if (PREDICT_FALSE(txn_id <= highest_txn_id_)) { + return Status::InvalidArgument( + Substitute("transaction ID $0 is not higher than the highest ID so far: $1", + txn_id, highest_txn_id_)); + } + highest_txn_id_ = txn_id; + } + + // NOTE: it's fine if these underlying tablet ops race with one another -- + // since we've serialized the transaction ID checking above, we're guaranteed + // that at most one call to start a given transaction ID can succeed. + + // Write an entry to the status tablet for this transaction. + RETURN_NOT_OK(status_tablet_.AddNewTransaction(txn_id, user)); + + // Now that we've successfully persisted the new transaction ID, initialize + // the in-memory state and make it visible to clients. + scoped_refptr<TransactionEntry> txn = new TransactionEntry(txn_id, user); + { + TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE); + txn_lock.mutable_data()->pb.set_state(TxnStatePB::OPEN); + txn_lock.mutable_data()->pb.set_user(user); + txn_lock.Commit(); + } + std::lock_guard<simple_spinlock> l(lock_); + EmplaceOrDie(&txns_by_id_, txn_id, std::move(txn)); + return Status::OK(); +} + +Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& user) { + scoped_refptr<TransactionEntry> txn; + RETURN_NOT_OK(GetTransaction(txn_id, user, &txn)); + + TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE); + const auto& pb = txn_lock.data().pb; + const auto& state = pb.state(); + if (state == TxnStatePB::COMMIT_IN_PROGRESS) { + return Status::OK(); + } + if (PREDICT_FALSE(state != TxnStatePB::OPEN)) { + return Status::IllegalState( + Substitute("transaction ID $0 is not open: $1", + txn_id, SecureShortDebugString(pb))); + } + auto* mutable_data = txn_lock.mutable_data(); + mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS); + RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb)); + txn_lock.Commit(); + return Status::OK(); +} + +Status TxnStatusManager::FinalizeCommitTransaction(int64_t txn_id) { + scoped_refptr<TransactionEntry> txn; + RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn)); + + TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE); + const auto& pb = txn_lock.data().pb; + const auto& state = pb.state(); + if (state == TxnStatePB::COMMITTED) { + return Status::OK(); + } + if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS)) { + return Status::IllegalState( + Substitute("transaction ID $0 is not committing: $1", + txn_id, SecureShortDebugString(pb))); + } + auto* mutable_data = txn_lock.mutable_data(); + mutable_data->pb.set_state(TxnStatePB::COMMITTED); + RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb)); + txn_lock.Commit(); + return Status::OK(); +} + +Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& user) { + scoped_refptr<TransactionEntry> txn; + RETURN_NOT_OK(GetTransaction(txn_id, user, &txn)); + + TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE); + const auto& pb = txn_lock.data().pb; + const auto& state = pb.state(); + if (state == TxnStatePB::ABORTED) { + return Status::OK(); + } + if (PREDICT_FALSE(state != TxnStatePB::OPEN && + state != TxnStatePB::COMMIT_IN_PROGRESS)) { + return Status::IllegalState( + Substitute("transaction ID $0 cannot be aborted: $1", + txn_id, SecureShortDebugString(pb))); + } + auto* mutable_data = txn_lock.mutable_data(); + mutable_data->pb.set_state(TxnStatePB::ABORTED); + RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb)); + txn_lock.Commit(); + return Status::OK(); +} + +Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string& tablet_id, + const string& user) { + scoped_refptr<TransactionEntry> txn; + RETURN_NOT_OK(GetTransaction(txn_id, user, &txn)); + + // Lock the transaction in read mode and check that it's open. If the + // transaction isn't open, e.g. because a commit is already in progress, + // return an error. + TransactionEntryLock txn_lock(txn.get(), LockMode::READ); + const auto& txn_state = txn_lock.data().pb.state(); + if (PREDICT_FALSE(txn_state != TxnStatePB::OPEN)) { + return Status::IllegalState( + Substitute("transaction ID $0 not open: $1", + txn_id, TxnStatePB_Name(txn_state))); + } + + auto participant = txn->GetOrCreateParticipant(tablet_id); + ParticipantEntryLock prt_lock(participant.get(), LockMode::WRITE); + const auto& prt_state = prt_lock.data().pb.state(); + if (prt_state == TxnStatePB::OPEN) { + // If an open participant already exists, there's nothing more to do. + return Status::OK(); + } + if (PREDICT_FALSE(prt_state != TxnStatePB::UNKNOWN)) { + // If the participant is otherwise initialized, e.g. aborted, committing, + // etc, adding the participant again should fail. + return Status::IllegalState("participant entry already exists"); + } + prt_lock.mutable_data()->pb.set_state(TxnStatePB::OPEN); + + // Write the new participant entry. + RETURN_NOT_OK(status_tablet_.AddNewParticipant(txn_id, tablet_id)); + + // Now that we've persisted the new participant to disk, update the in-memory + // state to denote the participant is open. + prt_lock.Commit(); + return Status::OK(); +} + +ParticipantIdsByTxnId TxnStatusManager::GetParticipantsByTxnIdForTests() const { + ParticipantIdsByTxnId ret; + std::lock_guard<simple_spinlock> l(lock_); + for (const auto& id_and_txn : txns_by_id_) { + const auto& txn = id_and_txn.second; + vector<string> prt_ids = txn->GetParticipantIds(); + std::sort(prt_ids.begin(), prt_ids.end()); + EmplaceOrDie(&ret, id_and_txn.first, std::move(prt_ids)); + } + return ret; +} + +} // namespace transactions +} // namespace kudu diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h new file mode 100644 index 0000000..8162c13 --- /dev/null +++ b/src/kudu/transactions/txn_status_manager.h @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <map> +#include <mutex> +#include <string> +#include <unordered_map> +#include <vector> + +#include <boost/optional/optional.hpp> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/transactions/txn_status_entry.h" +#include "kudu/transactions/txn_status_tablet.h" +#include "kudu/util/locks.h" +#include "kudu/util/status.h" + +namespace kudu { + +namespace tablet { +class TabletReplica; +} // namespace tablet + +namespace transactions { + +class TxnStatusEntryPB; + +// Maps the transaction ID to the corresponding TransactionEntry. +typedef std::unordered_map<int64_t, scoped_refptr<TransactionEntry>> TransactionsMap; + +// Maps the transaction ID to the transaction's participants' tablet IDs. This +// is convenient to use in testing, given its relative ease of construction. +typedef std::map<int64_t, std::vector<std::string>> ParticipantIdsByTxnId; + +// Visitor used to iterate over and load into memory the existing state from a +// status tablet. +class TxnStatusManagerBuildingVisitor : public TransactionsVisitor { + public: + // Builds a TransactionEntry for the given metadata and keeps track of it in + // txns_by_id_. This is not thread-safe -- callers should ensure only a + // single thread calls it at once. + void VisitTransactionEntries(int64_t txn_id, TxnStatusEntryPB status_entry_pb, + std::vector<ParticipantIdAndPB> participants) override; + + // Releases the transactions map to the caller. Should only be called once + // per call to VisitTransactionEntries(). + void Release(int64_t* highest_txn_id, TransactionsMap* txns_by_id); + private: + int64_t highest_txn_id_ = -1; + TransactionsMap txns_by_id_; +}; + +// Manages ongoing transactions and participants therein, backed by an +// underlying tablet. +class TxnStatusManager { + public: + explicit TxnStatusManager(tablet::TabletReplica* tablet_replica) + : highest_txn_id_(-1), + status_tablet_(tablet_replica) {} + // Loads the contents of the status tablet into memory. + Status LoadFromTablet(); + + // Writes an entry to the status tablet and creates a transaction in memory. + // Returns an error if a higher transaction ID has already been attempted + // (even if that attempt failed), which helps ensure that at most one call to + // this method will succeed for a given transaction ID. + // + // TODO(awong): consider computing the next available transaction ID in this + // partition and using it in case this transaction is already used, or having + // callers forward a request for the next-highest transaction ID. + Status BeginTransaction(int64_t txn_id, const std::string& user); + + // Begins committing the given transaction, returning an error if the + // transaction doesn't exist, isn't open, or isn't owned by the given user. + Status BeginCommitTransaction(int64_t txn_id, const std::string& user); + + // Finalizes the commit of the transaction, returning an error if the + // transaction isn't in an appropraite state. + // + // Unlike the other transaction life-cycle calls, this isn't user-initiated, + // so it doesn't take a user. + // + // TODO(awong): add a commit timestamp. + Status FinalizeCommitTransaction(int64_t txn_id); + + // Aborts the given transaction, returning an error if the transaction + // doesn't exist, is committed or not yet opened, or isn't owned by the given + // user. + Status AbortTransaction(int64_t txn_id, const std::string& user); + + // Creates an in-memory participant, writes an entry to the status table, and + // attaches the in-memory participant to the transaction. + // + // If the transaction is open, it is ensured to be active for the duration of + // this call. Returns an error if the given transaction isn't open. + Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id, + const std::string& user); + + // Populates a map from transaction ID to the sorted list of participants + // associated with that transaction ID. + ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const; + + int64_t highest_txn_id() const { + std::lock_guard<simple_spinlock> l(lock_); + return highest_txn_id_; + } + + private: + // Returns the transaction entry, returning an error if the transaction ID + // doesn't exist or if 'user' is specified but isn't the owner of the + // transaction. + Status GetTransaction(int64_t txn_id, const boost::optional<std::string>& user, + scoped_refptr<TransactionEntry>* txn) const; + + // Protects 'highest_txn_id_' and 'txns_by_id_'. + mutable simple_spinlock lock_; + + // The highest transaction ID seen by this status manager so far. Requests to + // create a new transaction must provide an ID higher than this ID. + int64_t highest_txn_id_; + + // Tracks the currently on-going transactions. + TransactionsMap txns_by_id_; + + // The access to underlying storage. + TxnStatusTablet status_tablet_; +}; + +} // namespace transactions +} // namespace kudu diff --git a/src/kudu/util/cow_object.h b/src/kudu/util/cow_object.h index 159a8bb..c03721c 100644 --- a/src/kudu/util/cow_object.h +++ b/src/kudu/util/cow_object.h @@ -434,4 +434,51 @@ class CowGroupLock { DISALLOW_COPY_AND_ASSIGN(CowGroupLock); }; +// Helper to manage locking on the metadata protected by a CowLock. +// +// Example: +// +// struct MetadataState { +// MessagePB pb; +// }; +// +// class Metadata { +// public: +// typedef MetadataState cow_state; +// +// const CowObject<MetadataState>& metadata() const { return metadata_; } +// CowObject<MetadataState>* mutable_metadata() { return &metadata_; } +// +// private: +// CowObject<MetadataState> metadata_; +// }; +// +// Sample usage: +// +// Metadata m; +// MessagePB pb_copy; +// { +// MetadataLock l(&m, LockMode::READ); +// pb_copy = l.data().pb; +// } +// { +// MetadataLock l(&m, LockMode::WRITE); +// l.mutable_data().pb = new_pb; +// l.Commit(); +// } +template<class MetadataClass> +class MetadataLock : public CowLock<typename MetadataClass::cow_state> { + public: + typedef CowLock<typename MetadataClass::cow_state> super; + MetadataLock() + : super() { + } + MetadataLock(MetadataClass* info, LockMode mode) + : super(DCHECK_NOTNULL(info)->mutable_metadata(), mode) { + } + MetadataLock(const MetadataClass* info, LockMode mode) + : super(&(DCHECK_NOTNULL(info))->metadata(), mode) { + } +}; + } // namespace kudu
