This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f08d524ac73dbbac6962a8fa857eaaa5701a6809 Author: Andrew Wong <[email protected]> AuthorDate: Tue Jun 30 12:09:46 2020 -0700 KUDU-2612 p3: tserver mechanism to create txn status tablets This introduces the concept of a table type in tablets. This is used in the context of creating transaction status table partitions; if a tablet replica is created as a partition of a transaction status table, the underlying replica will initialize some state to manage and coordinate transactions -- namely, a TxnStatusManager. For the sake of decoupling submodules, to get a TabletReplica to initialize a TxnStatusManager, this patch introduces the tablet::TxnCoordinator and tablet::TxnCoordinatorFactory interfaces that TxnStatusManager and the new TxnStatusManagerFactory implement respectively. The TxnStatusManagerFactory can be created by members of the tserver and passed to TabletReplicas upon initialization -- this layer of indirection will allow us to use tserver-wide state (e.g. in the future, a system client) without muddying the tablet subdirectory too much. This approach lies in contrast to the approach used for the CatalogManager, in which the master server owns a CatalogManager that owns the underlying SysCatalogTable and TabletReplica. I went down this route because unlike the CatalogManager replicas, I expect the replicas of TxnStatusTablets to be dynamically moved around, and so it behooves us to reuse as much of the existing tserver replica management code as possible. To that end, the ownership relationship between management state and underyling tablet replica is flipped, with the TabletReplica owning the TxnStatusManager. The plumbing only extends through the tablet servers -- the ability to create transaction status tables and define partitioning is not yet plumbed into the master. Additionally, there is still currently no means to restrict calls to the TxnStatusManagers whose underlying replicas are running leaders -- that will also come in later patches. Change-Id: Ib429f055e12944fa930f3e95ec4f2504466d3d02 Reviewed-on: http://gerrit.cloudera.org:8080/16116 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- src/kudu/common/common.proto | 11 ++ .../integration-tests/ts_tablet_manager-itest.cc | 182 +++++++++++++++++++++ src/kudu/master/sys_catalog.cc | 2 + src/kudu/tablet/metadata.proto | 4 + src/kudu/tablet/tablet-harness.h | 1 + src/kudu/tablet/tablet_bootstrap-test.cc | 1 + src/kudu/tablet/tablet_metadata.cc | 24 ++- src/kudu/tablet/tablet_metadata.h | 12 +- src/kudu/tablet/tablet_replica-test-base.cc | 1 + src/kudu/tablet/tablet_replica.cc | 12 ++ src/kudu/tablet/tablet_replica.h | 17 +- src/kudu/tablet/txn_coordinator.h | 74 +++++++++ src/kudu/tools/kudu-tool-test.cc | 2 + src/kudu/transactions/CMakeLists.txt | 1 - src/kudu/transactions/txn_status_manager-test.cc | 2 + src/kudu/transactions/txn_status_manager.cc | 1 + src/kudu/transactions/txn_status_manager.h | 36 ++-- src/kudu/transactions/txn_status_tablet.cc | 56 ++++--- src/kudu/transactions/txn_status_tablet.h | 10 ++ src/kudu/tserver/CMakeLists.txt | 1 + src/kudu/tserver/mini_tablet_server.cc | 2 +- src/kudu/tserver/tablet_copy_client.cc | 26 +-- .../tserver/tablet_copy_source_session-test.cc | 1 + src/kudu/tserver/tablet_server-test.cc | 2 +- src/kudu/tserver/tablet_service.cc | 1 + src/kudu/tserver/ts_tablet_manager-test.cc | 1 + src/kudu/tserver/ts_tablet_manager.cc | 9 + src/kudu/tserver/ts_tablet_manager.h | 3 +- src/kudu/tserver/tserver_admin.proto | 4 + 29 files changed, 442 insertions(+), 57 deletions(-) diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index 5a8da7e..d3d51f7 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -446,3 +446,14 @@ message TableExtraConfigPB { // calculate maintenance priority score. optional int32 maintenance_priority = 2; } + +// The type of a given table. This is useful in determining whether a +// table/tablet stores user-specified data, as opposed to being a Kudu-internal +// system table. +enum TableTypePB { + // The table stores user data. + DEFAULT_TABLE = 0; + + // The table stores transaction status management metadata. + TXN_STATUS_TABLE = 1; +} diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc index fd8acbc..d10e18f 100644 --- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc +++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc @@ -37,13 +37,16 @@ #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep #include "kudu/client/write_op.h" +#include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" +#include "kudu/common/partition.h" #include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/quorum_util.h" #include "kudu/consensus/raft_consensus.h" +#include "kudu/fs/fs_manager.h" #include "kudu/gutil/basictypes.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" @@ -63,12 +66,16 @@ #include "kudu/rpc/rpc_controller.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/tablet_replica.h" +#include "kudu/tablet/txn_coordinator.h" +#include "kudu/transactions/txn_status_tablet.h" #include "kudu/tserver/heartbeater.h" #include "kudu/tserver/mini_tablet_server.h" #include "kudu/tserver/tablet_server.h" #include "kudu/tserver/tablet_server_options.h" #include "kudu/tserver/ts_tablet_manager.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/tserver/tserver_admin.pb.h" +#include "kudu/tserver/tserver_admin.proxy.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/jsonwriter.h" #include "kudu/util/metrics.h" @@ -111,6 +118,9 @@ using kudu::consensus::RaftConfigPB; using kudu::consensus::RaftConsensus; using kudu::consensus::RaftPeerPB; using kudu::itest::SimpleIntKeyKuduSchema; +using kudu::itest::StartTabletCopy; +using kudu::itest::TabletServerMap; +using kudu::itest::TServerDetails; using kudu::KuduPartialRow; using kudu::master::CatalogManager; using kudu::master::GetTableLocationsResponsePB; @@ -120,9 +130,14 @@ using kudu::master::MasterServiceProxy; using kudu::master::ReportedTabletPB; using kudu::master::TableInfo; using kudu::master::TabletReportPB; +using kudu::pb_util::SecureDebugString; using kudu::rpc::Messenger; using kudu::rpc::MessengerBuilder; +using kudu::rpc::RpcController; using kudu::tablet::TabletReplica; +using kudu::tablet::ParticipantIdsByTxnId; +using kudu::tablet::TxnCoordinator; +using kudu::transactions::TxnStatusTablet; using kudu::tserver::MiniTabletServer; using kudu::ClusterVerifier; using std::map; @@ -978,5 +993,172 @@ TEST_F(TsTabletManagerITest, TestDeleteTableDuringTabletCopy) { }); } +namespace { + +Status GetPartitionForTxnStatusTablet(int64_t start_txn_id, int64_t end_txn_id, + PartitionSchema* partition_schema, + Partition* partition) { + const auto& schema = TxnStatusTablet::GetSchema(); + // Add range partitioning on the transaction ID column. + PartitionSchemaPB partition_pb; + auto* range_schema = partition_pb.mutable_range_schema(); + range_schema->add_columns()->set_name(TxnStatusTablet::kTxnIdColName); + PartitionSchema pschema; + RETURN_NOT_OK(PartitionSchema::FromPB(partition_pb, schema, &pschema)); + + // Create some bounds for the partition. + KuduPartialRow lower_bound(&schema); + KuduPartialRow upper_bound(&schema); + RETURN_NOT_OK(lower_bound.SetInt64(TxnStatusTablet::kTxnIdColName, start_txn_id)); + RETURN_NOT_OK(upper_bound.SetInt64(TxnStatusTablet::kTxnIdColName, end_txn_id)); + vector<Partition> ps; + RETURN_NOT_OK(pschema.CreatePartitions(/*split_rows=*/{}, + { std::make_pair(lower_bound, upper_bound) }, schema, &ps)); + *partition = ps[0]; + *partition_schema = pschema; + return Status::OK(); +} + +} // anonymous namespace + +class TxnStatusTabletManagementTest : public TsTabletManagerITest { + public: + static constexpr const char* kOwner = "jojo"; + const char* kParticipant = "participant"; + const char* kTxnStatusTabletId = "11111111111111111111111111111111"; + const MonoDelta kTimeout = MonoDelta::FromSeconds(30); + + // Creates a request to create a transaction status tablet with the given IDs + // and Raft config. + CreateTabletRequestPB CreateTxnTabletReq(const string& tablet_id, const string& replica_id, + RaftConfigPB raft_config) { + CreateTabletRequestPB req; + req.set_table_type(TableTypePB::TXN_STATUS_TABLE); + req.set_dest_uuid(replica_id); + req.set_table_id("txn_status_table"); + req.set_table_name("txn_status_table"); + req.set_tablet_id(tablet_id); + *req.mutable_config() = std::move(raft_config); + CHECK_OK(SchemaToPB(TxnStatusTablet::GetSchema(), req.mutable_schema())); + return req; + } + + // Creates a transaction status tablet at the given tablet server. + Status CreateTxnStatusTablet(MiniTabletServer* ts) { + CreateTabletRequestPB req = CreateTxnTabletReq( + kTxnStatusTabletId, ts->server()->fs_manager()->uuid(), ts->CreateLocalConfig()); + CreateTabletResponsePB resp; + rpc::RpcController rpc; + + // Put together a partition spec for this tablet. + PartitionSchema partition_schema; + Partition partition; + RETURN_NOT_OK(GetPartitionForTxnStatusTablet(0, 100, &partition_schema, &partition)); + partition.ToPB(req.mutable_partition()); + + unique_ptr<TabletServerAdminServiceProxy> admin_proxy( + new TabletServerAdminServiceProxy(client_messenger_, ts->bound_rpc_addr(), + ts->bound_rpc_addr().host())); + RETURN_NOT_OK(admin_proxy->CreateTablet(req, &resp, &rpc)); + scoped_refptr<TabletReplica> r; + CHECK(ts->server()->tablet_manager()->LookupTablet(kTxnStatusTabletId, &r)); + return r->consensus()->WaitUntilLeaderForTests(kTimeout); + } + + Status StartTransactions(const ParticipantIdsByTxnId& txns, TxnCoordinator* coordinator) { + for (const auto& txn_id_and_prt_ids : txns) { + const auto& txn_id = txn_id_and_prt_ids.first; + RETURN_NOT_OK(coordinator->BeginTransaction(txn_id, kOwner)); + for (const auto& prt_id : txn_id_and_prt_ids.second) { + RETURN_NOT_OK(coordinator->RegisterParticipant(txn_id, prt_id, kOwner)); + } + } + return Status::OK(); + } +}; + +TEST_F(TxnStatusTabletManagementTest, TestBootstrapTransactionStatusTablet) { + NO_FATALS(StartCluster({})); + auto* ts0 = cluster_->mini_tablet_server(0); + ASSERT_OK(CreateTxnStatusTablet(ts0)); + const ParticipantIdsByTxnId kExpectedTxns = { + { 1, { kParticipant } }, + { 2, {} }, + }; + { + // Ensure the replica has a coordinator. + scoped_refptr<TabletReplica> replica; + ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( + kTxnStatusTabletId, &replica)); + ASSERT_OK(replica->WaitUntilConsensusRunning(kTimeout)); + TxnCoordinator* coordinator = replica->txn_coordinator(); + ASSERT_NE(nullptr, coordinator); + + // Create a transaction and participant. + ASSERT_OK(StartTransactions(kExpectedTxns, coordinator)); + } + // Restart the tablet server, reload the transaction status tablet, and + // ensure we have the expected state. + ts0->Shutdown(); + ASSERT_OK(ts0->Restart()); + { + scoped_refptr<TabletReplica> replica; + ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( + kTxnStatusTabletId, &replica)); + TxnCoordinator* coordinator = replica->txn_coordinator(); + ASSERT_NE(nullptr, coordinator); + // Wait for the contents of the tablet to be loaded into memory. + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(kExpectedTxns, coordinator->GetParticipantsByTxnIdForTests()); + }); + } +} + +TEST_F(TxnStatusTabletManagementTest, TestCopyTransactionStatusTablet) { + InternalMiniClusterOptions opts; + opts.num_tablet_servers = 2; + NO_FATALS(StartCluster(std::move(opts))); + const ParticipantIdsByTxnId kExpectedTxns = { + { 1, { kParticipant } }, + { 2, {} }, + }; + + auto* ts0 = cluster_->mini_tablet_server(0); + ASSERT_OK(CreateTxnStatusTablet(ts0)); + { + // Ensure the replica has a coordinator. + scoped_refptr<TabletReplica> replica; + ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( + kTxnStatusTabletId, &replica)); + ASSERT_OK(replica->WaitUntilConsensusRunning(kTimeout)); + TxnCoordinator* coordinator = replica->txn_coordinator(); + ASSERT_NE(nullptr, coordinator); + + // Create a transaction and participant. + ASSERT_OK(StartTransactions(kExpectedTxns, coordinator)); + } + TabletServerMap ts_map; + ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(), client_messenger_, &ts_map)); + ValueDeleter deleter(&ts_map); + auto* ts1 = cluster_->mini_tablet_server(1); + TServerDetails* src_details = ts_map[ts0->uuid()]; + TServerDetails* dst_details = ts_map[ts1->uuid()]; + HostPort src_addr = HostPortFromPB(src_details->registration.rpc_addresses(0)); + ASSERT_OK(StartTabletCopy(dst_details, kTxnStatusTabletId, src_details->uuid(), + src_addr, std::numeric_limits<int64_t>::max(), kTimeout)); + { + scoped_refptr<TabletReplica> replica; + ASSERT_TRUE(ts1->server()->tablet_manager()->LookupTablet( + kTxnStatusTabletId, &replica)); + ASSERT_OK(replica->WaitUntilConsensusRunning(MonoDelta::FromSeconds(3))); + TxnCoordinator* coordinator = replica->txn_coordinator(); + ASSERT_NE(nullptr, coordinator); + // Wait for the contents of the tablet to be loaded into memory. + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(kExpectedTxns, coordinator->GetParticipantsByTxnIdForTests()); + }); + } +} + } // namespace tserver } // namespace kudu diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index e32d6c3..f19e5cf 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -273,6 +273,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) { /*supports_live_row_count=*/ true, /*extra_config=*/ boost::none, /*dimension_label=*/ boost::none, + /*table_type=*/ boost::none, &metadata)); RaftConfigPB config; @@ -394,6 +395,7 @@ Status SysCatalogTable::SetupTablet( cmeta_manager_, local_peer_pb_, master_->tablet_apply_pool(), + /*txn_coordinator_factory*/ nullptr, [this, tablet_id](const string& reason) { this->SysCatalogStateChanged(tablet_id, reason); })); diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto index e3fa281..9c58636 100644 --- a/src/kudu/tablet/metadata.proto +++ b/src/kudu/tablet/metadata.proto @@ -89,6 +89,10 @@ message TabletSuperBlockPB { // Tablet Id required bytes tablet_id = 2; + // The type of table this tablet belongs to. If not set, the assumption is + // this is a user-defined table as opposed to a Kudu-internal system table. + optional TableTypePB table_type = 19; + // The latest durable MemRowSet id required int64 last_durable_mrs_id = 3; diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h index 346155d..b026554 100644 --- a/src/kudu/tablet/tablet-harness.h +++ b/src/kudu/tablet/tablet-harness.h @@ -102,6 +102,7 @@ class TabletHarness { /*tombstone_last_logged_opid=*/ boost::none, /*extra_config=*/ boost::none, /*dimension_label=*/ boost::none, + /*table_type=*/ boost::none, &metadata)); metrics_registry_.reset(new MetricRegistry); metric_entity_ = METRIC_ENTITY_server.Instantiate(metrics_registry_.get(), diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc index 257dfd2..c800bcd 100644 --- a/src/kudu/tablet/tablet_bootstrap-test.cc +++ b/src/kudu/tablet/tablet_bootstrap-test.cc @@ -119,6 +119,7 @@ class BootstrapTest : public LogTestBase { /*tombstone_last_logged_opid=*/ boost::none, /*extra_config=*/ boost::none, /*dimension_label=*/ boost::none, + /*table_type=*/ boost::none, meta)); (*meta)->SetLastDurableMrsIdForTests(mrs_id); if ((*meta)->GetRowSetForTests(0) != nullptr) { diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index 297ff89..c4edd3f 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -93,6 +93,7 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager, bool supports_live_row_count, boost::optional<TableExtraConfigPB> extra_config, boost::optional<string> dimension_label, + boost::optional<TableTypePB> table_type, scoped_refptr<TabletMetadata>* metadata) { // Verify that no existing tablet exists with the same ID. @@ -116,7 +117,8 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager, std::move(tombstone_last_logged_opid), supports_live_row_count, std::move(extra_config), - std::move(dimension_label))); + std::move(dimension_label), + std::move(table_type))); RETURN_NOT_OK(ret->Flush()); dir_group_cleanup.cancel(); @@ -144,6 +146,7 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager, boost::optional<OpId> tombstone_last_logged_opid, boost::optional<TableExtraConfigPB> extra_config, boost::optional<string> dimension_label, + boost::optional<TableTypePB> table_type, scoped_refptr<TabletMetadata>* metadata) { Status s = Load(fs_manager, tablet_id, metadata); if (s.ok()) { @@ -161,6 +164,7 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager, /*supports_live_row_count=*/ true, std::move(extra_config), std::move(dimension_label), + std::move(table_type), metadata); } return s; @@ -281,7 +285,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id, boost::optional<OpId> tombstone_last_logged_opid, bool supports_live_row_count, boost::optional<TableExtraConfigPB> extra_config, - boost::optional<string> dimension_label) + boost::optional<string> dimension_label, + boost::optional<TableTypePB> table_type) : state_(kNotWrittenYet), tablet_id_(std::move(tablet_id)), table_id_(std::move(table_id)), @@ -297,6 +302,7 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id, tombstone_last_logged_opid_(std::move(tombstone_last_logged_opid)), extra_config_(std::move(extra_config)), dimension_label_(std::move(dimension_label)), + table_type_(std::move(table_type)), num_flush_pins_(0), needs_flush_(false), flush_count_for_tests_(0), @@ -475,6 +481,10 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) } else { dimension_label_ = boost::none; } + + if (superblock.has_table_type() && superblock.table_type() != TableTypePB::DEFAULT_TABLE) { + table_type_ = superblock.table_type(); + } } // Now is a good time to clean up any orphaned blocks that may have been @@ -724,6 +734,11 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, pb.set_dimension_label(*dimension_label_); } + if (table_type_) { + DCHECK_NE(TableTypePB::DEFAULT_TABLE, *table_type_); + pb.set_table_type(*table_type_); + } + super_block->Swap(&pb); return Status::OK(); } @@ -824,5 +839,10 @@ boost::optional<string> TabletMetadata::dimension_label() const { return dimension_label_; } +const boost::optional<TableTypePB>& TabletMetadata::table_type() const { + std::lock_guard<LockType> l(data_lock_); + return table_type_; +} + } // namespace tablet } // namespace kudu diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index 6700c87..8b26887 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -26,6 +26,7 @@ #include <boost/optional/optional.hpp> #include <glog/logging.h> +#include "kudu/common/common.pb.h" #include "kudu/common/partition.h" #include "kudu/fs/block_id.h" #include "kudu/gutil/atomicops.h" @@ -42,7 +43,6 @@ namespace kudu { class BlockIdPB; class FsManager; class Schema; -class TableExtraConfigPB; namespace consensus { class OpId; @@ -83,6 +83,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { bool supports_live_row_count, boost::optional<TableExtraConfigPB> extra_config, boost::optional<std::string> dimension_label, + boost::optional<TableTypePB> table_type, scoped_refptr<TabletMetadata>* metadata); // Load existing metadata from disk. @@ -106,6 +107,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { boost::optional<consensus::OpId> tombstone_last_logged_opid, boost::optional<TableExtraConfigPB> extra_config, boost::optional<std::string> dimension_label, + boost::optional<TableTypePB> table_type, scoped_refptr<TabletMetadata>* metadata); static std::vector<BlockIdPB> CollectBlockIdPBs( @@ -128,6 +130,8 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { return table_id_; } + const boost::optional<TableTypePB>& table_type() const; + std::string table_name() const; uint32_t schema_version() const; @@ -304,7 +308,8 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { boost::optional<consensus::OpId> tombstone_last_logged_opid, bool supports_live_row_count, boost::optional<TableExtraConfigPB> extra_config, - boost::optional<std::string> dimension_label); + boost::optional<std::string> dimension_label, + boost::optional<TableTypePB> table_type); // Constructor for loading an existing tablet. TabletMetadata(FsManager* fs_manager, std::string tablet_id); @@ -398,6 +403,9 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { // Tablet's dimension label. boost::optional<std::string> dimension_label_; + // The table type of the table this tablet belongs to. + boost::optional<TableTypePB> table_type_; + // If this counter is > 0 then Flush() will not write any data to // disk. int32_t num_flush_pins_; diff --git a/src/kudu/tablet/tablet_replica-test-base.cc b/src/kudu/tablet/tablet_replica-test-base.cc index 4cac790..3e0fb8a 100644 --- a/src/kudu/tablet/tablet_replica-test-base.cc +++ b/src/kudu/tablet/tablet_replica-test-base.cc @@ -112,6 +112,7 @@ Status TabletReplicaTestBase::SetUpReplica(bool new_replica) { cmeta_manager_, *config_peer, apply_pool_.get(), + /*txn_coordinator_factory*/nullptr, [tablet_id] (const string& reason) { LOG(INFO) << Substitute( "state change callback run for $0: $1", tablet_id, reason); diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index 4e9279b..63b9a22 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -25,8 +25,10 @@ #include <string> #include <vector> +#include <boost/optional/optional.hpp> #include <glog/logging.h> +#include "kudu/common/common.pb.h" #include "kudu/common/partition.h" #include "kudu/common/timestamp.h" #include "kudu/consensus/consensus.pb.h" @@ -49,6 +51,7 @@ #include "kudu/tablet/ops/write_op.h" #include "kudu/tablet/tablet.pb.h" #include "kudu/tablet/tablet_replica_mm_ops.h" +#include "kudu/tablet/txn_coordinator.h" #include "kudu/util/logging.h" #include "kudu/util/maintenance_manager.h" #include "kudu/util/metrics.h" @@ -130,12 +133,16 @@ TabletReplica::TabletReplica( scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager, consensus::RaftPeerPB local_peer_pb, ThreadPool* apply_pool, + TxnCoordinatorFactory* txn_coordinator_factory, MarkDirtyCallback cb) : meta_(DCHECK_NOTNULL(std::move(meta))), cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))), local_peer_pb_(std::move(local_peer_pb)), log_anchor_registry_(new LogAnchorRegistry()), apply_pool_(apply_pool), + txn_coordinator_(meta_->table_type() && + *meta_->table_type() == TableTypePB::TXN_STATUS_TABLE ? + DCHECK_NOTNULL(txn_coordinator_factory)->Create(this) : nullptr), mark_dirty_clbk_(std::move(cb)), state_(NOT_INITIALIZED), last_status_("Tablet initializing...") { @@ -252,6 +259,11 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info, CHECK_EQ(BOOTSTRAPPING, state_); // We are still protected by 'state_change_lock_'. set_state(RUNNING); } + // TODO(awong): hook a callback into the TxnStatusManager that runs this when + // we become leader such that only leaders load the tablet into memory. + if (txn_coordinator_) { + RETURN_NOT_OK(txn_coordinator_->LoadFromTablet()); + } return Status::OK(); } diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index aa52910..35bcd2d 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -53,8 +53,8 @@ class ThreadPoolToken; namespace consensus { class ConsensusMetadataManager; -class TimeManager; class OpStatusPB; +class TimeManager; } namespace clock { @@ -72,9 +72,10 @@ class ResultTracker; namespace tablet { class AlterSchemaOpState; -class TabletReplicaTestBase; -class TabletStatusPB; class OpDriver; +class TabletStatusPB; +class TxnCoordinator; +class TxnCoordinatorFactory; class WriteOpState; // A replica in a tablet consensus configuration, which coordinates writes to tablets. @@ -89,6 +90,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>, scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager, consensus::RaftPeerPB local_peer_pb, ThreadPool* apply_pool, + TxnCoordinatorFactory* txn_coordinator_factory, consensus::MarkDirtyCallback cb); // Initializes RaftConsensus. @@ -310,6 +312,10 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>, // Return the tablet stats. ReportedTabletStatsPB GetTabletStats() const; + TxnCoordinator* txn_coordinator() const { + return txn_coordinator_.get(); + } + private: friend class kudu::AlterTableTest; friend class RefCountedThreadSafe<TabletReplica>; @@ -342,6 +348,11 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>, // Tablet server. ThreadPool* const apply_pool_; + // If this tablet is a part of the transaction status table, this is the + // entity responsible for accepting and managing requests to coordinate + // transactions. + const std::unique_ptr<TxnCoordinator> txn_coordinator_; + // Function to mark this TabletReplica's tablet as dirty in the TSTabletManager. // // Must be called whenever cluster membership or leadership changes, or when diff --git a/src/kudu/tablet/txn_coordinator.h b/src/kudu/tablet/txn_coordinator.h new file mode 100644 index 0000000..861d941 --- /dev/null +++ b/src/kudu/tablet/txn_coordinator.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "kudu/util/status.h" + +namespace kudu { +namespace tablet { + +class TabletReplica; + +// Maps the transaction ID to the transaction's participants' tablet IDs. This +// is convenient to use in testing, given its relative ease of construction. +typedef std::map<int64_t, std::vector<std::string>> ParticipantIdsByTxnId; + +// Manages ongoing transactions and participants thereof. +class TxnCoordinator { + public: + virtual ~TxnCoordinator() {} + + virtual Status LoadFromTablet() = 0; + + // Starts a transaction with the given ID as the given user. + virtual Status BeginTransaction(int64_t txn_id, const std::string& user) = 0; + + // Begins committing the given transaction as the given user. + virtual Status BeginCommitTransaction(int64_t txn_id, const std::string& user) = 0; + + // Finalizes the commit of the transaction. + // TODO(awong): add a commit timestamp. + virtual Status FinalizeCommitTransaction(int64_t txn_id) = 0; + + // Aborts the given transaction as the given user. + virtual Status AbortTransaction(int64_t txn_id, const std::string& user) = 0; + + // Registers a participant tablet ID to the given transaction ID as the given + // user. + virtual Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id, + const std::string& user) = 0; + + // Populates a map from transaction ID to the list of participants associated + // with that transaction ID. + virtual ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const = 0; + + // The highest transaction ID seen by this coordinator. + virtual int64_t highest_txn_id() const = 0; +}; + +class TxnCoordinatorFactory { + public: + virtual std::unique_ptr<TxnCoordinator> Create(TabletReplica* replica) = 0; +}; + +} // namespace tablet +} // namespace kudu diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 427f1a7..c2b6a7d 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -1720,6 +1720,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) { /*supports_live_row_count=*/ true, /*extra_config=*/ boost::none, /*dimension_label=*/ boost::none, + /*table_type=*/ boost::none, &meta)); string stdout; NO_FATALS(RunActionStdoutString(Substitute("local_replica dump data_dirs $0 " @@ -1757,6 +1758,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) { /*supports_live_row_count=*/ true, /*extra_config=*/ boost::none, /*dimension_label=*/ boost::none, + /*table_type=*/ boost::none, &meta); string stdout; NO_FATALS(RunActionStdoutString(Substitute("local_replica dump meta $0 " diff --git a/src/kudu/transactions/CMakeLists.txt b/src/kudu/transactions/CMakeLists.txt index c5b3e89..65a5acd 100644 --- a/src/kudu/transactions/CMakeLists.txt +++ b/src/kudu/transactions/CMakeLists.txt @@ -39,7 +39,6 @@ target_link_libraries(transactions kudu_common tablet transactions_proto - tserver ${KUDU_BASE_LIBS} ) diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc index c912e9c..9e1e8c7 100644 --- a/src/kudu/transactions/txn_status_manager-test.cc +++ b/src/kudu/transactions/txn_status_manager-test.cc @@ -39,6 +39,7 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/tablet-test-util.h" #include "kudu/tablet/tablet_replica-test-base.h" +#include "kudu/tablet/txn_coordinator.h" #include "kudu/transactions/transactions.pb.h" #include "kudu/transactions/txn_status_tablet.h" #include "kudu/util/barrier.h" @@ -52,6 +53,7 @@ #include "kudu/util/test_util.h" using kudu::consensus::ConsensusBootstrapInfo; +using kudu::tablet::ParticipantIdsByTxnId; using kudu::tablet::TabletReplicaTestBase; using std::string; using std::thread; diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc index a7d85ed..737ed47 100644 --- a/src/kudu/transactions/txn_status_manager.cc +++ b/src/kudu/transactions/txn_status_manager.cc @@ -34,6 +34,7 @@ #include "kudu/util/status.h" using kudu::pb_util::SecureShortDebugString; +using kudu::tablet::ParticipantIdsByTxnId; using std::string; using std::vector; using strings::Substitute; diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h index 8162c13..e5d8e7b 100644 --- a/src/kudu/transactions/txn_status_manager.h +++ b/src/kudu/transactions/txn_status_manager.h @@ -17,7 +17,7 @@ #pragma once #include <cstdint> -#include <map> +#include <memory> #include <mutex> #include <string> #include <unordered_map> @@ -26,6 +26,7 @@ #include <boost/optional/optional.hpp> #include "kudu/gutil/ref_counted.h" +#include "kudu/tablet/txn_coordinator.h" #include "kudu/transactions/txn_status_entry.h" #include "kudu/transactions/txn_status_tablet.h" #include "kudu/util/locks.h" @@ -44,10 +45,6 @@ class TxnStatusEntryPB; // Maps the transaction ID to the corresponding TransactionEntry. typedef std::unordered_map<int64_t, scoped_refptr<TransactionEntry>> TransactionsMap; -// Maps the transaction ID to the transaction's participants' tablet IDs. This -// is convenient to use in testing, given its relative ease of construction. -typedef std::map<int64_t, std::vector<std::string>> ParticipantIdsByTxnId; - // Visitor used to iterate over and load into memory the existing state from a // status tablet. class TxnStatusManagerBuildingVisitor : public TransactionsVisitor { @@ -68,13 +65,15 @@ class TxnStatusManagerBuildingVisitor : public TransactionsVisitor { // Manages ongoing transactions and participants therein, backed by an // underlying tablet. -class TxnStatusManager { +class TxnStatusManager : public tablet::TxnCoordinator { public: explicit TxnStatusManager(tablet::TabletReplica* tablet_replica) : highest_txn_id_(-1), status_tablet_(tablet_replica) {} + virtual ~TxnStatusManager() {} + // Loads the contents of the status tablet into memory. - Status LoadFromTablet(); + Status LoadFromTablet() override; // Writes an entry to the status tablet and creates a transaction in memory. // Returns an error if a higher transaction ID has already been attempted @@ -84,11 +83,11 @@ class TxnStatusManager { // TODO(awong): consider computing the next available transaction ID in this // partition and using it in case this transaction is already used, or having // callers forward a request for the next-highest transaction ID. - Status BeginTransaction(int64_t txn_id, const std::string& user); + Status BeginTransaction(int64_t txn_id, const std::string& user) override; // Begins committing the given transaction, returning an error if the // transaction doesn't exist, isn't open, or isn't owned by the given user. - Status BeginCommitTransaction(int64_t txn_id, const std::string& user); + Status BeginCommitTransaction(int64_t txn_id, const std::string& user) override; // Finalizes the commit of the transaction, returning an error if the // transaction isn't in an appropraite state. @@ -97,12 +96,12 @@ class TxnStatusManager { // so it doesn't take a user. // // TODO(awong): add a commit timestamp. - Status FinalizeCommitTransaction(int64_t txn_id); + Status FinalizeCommitTransaction(int64_t txn_id) override; // Aborts the given transaction, returning an error if the transaction // doesn't exist, is committed or not yet opened, or isn't owned by the given // user. - Status AbortTransaction(int64_t txn_id, const std::string& user); + Status AbortTransaction(int64_t txn_id, const std::string& user) override; // Creates an in-memory participant, writes an entry to the status table, and // attaches the in-memory participant to the transaction. @@ -110,13 +109,13 @@ class TxnStatusManager { // If the transaction is open, it is ensured to be active for the duration of // this call. Returns an error if the given transaction isn't open. Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id, - const std::string& user); + const std::string& user) override; // Populates a map from transaction ID to the sorted list of participants // associated with that transaction ID. - ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const; + tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override; - int64_t highest_txn_id() const { + int64_t highest_txn_id() const override { std::lock_guard<simple_spinlock> l(lock_); return highest_txn_id_; } @@ -142,5 +141,14 @@ class TxnStatusManager { TxnStatusTablet status_tablet_; }; +class TxnStatusManagerFactory : public tablet::TxnCoordinatorFactory { + public: + TxnStatusManagerFactory() {} + + std::unique_ptr<tablet::TxnCoordinator> Create(tablet::TabletReplica* replica) override { + return std::unique_ptr<tablet::TxnCoordinator>(new TxnStatusManager(replica)); + } +}; + } // namespace transactions } // namespace kudu diff --git a/src/kudu/transactions/txn_status_tablet.cc b/src/kudu/transactions/txn_status_tablet.cc index 968454b..337c659 100644 --- a/src/kudu/transactions/txn_status_tablet.cc +++ b/src/kudu/transactions/txn_status_tablet.cc @@ -67,11 +67,6 @@ namespace transactions { namespace { -const char* kTxnIdColName = "txn_id"; -const char* kEntryTypeColName = "entry_type"; -const char* kIdentifierColName = "identifier"; -const char* kMetadataColName = "metadata"; - int kTxnIdColIdx = -1; int kEntryTypeColIdx = -1; int kIdentifierColIdx = -1; @@ -81,10 +76,10 @@ Status InitTxnStatusColIdxs() { static KuduOnceLambda col_idx_initializer; return col_idx_initializer.Init([] { const auto& schema = TxnStatusTablet::GetSchemaWithoutIds(); - kTxnIdColIdx = schema.find_column(kTxnIdColName); - kEntryTypeColIdx = schema.find_column(kEntryTypeColName); - kIdentifierColIdx = schema.find_column(kIdentifierColName); - kMetadataColIdx = schema.find_column(kMetadataColName); + kTxnIdColIdx = schema.find_column(TxnStatusTablet::kTxnIdColName); + kEntryTypeColIdx = schema.find_column(TxnStatusTablet::kEntryTypeColName); + kIdentifierColIdx = schema.find_column(TxnStatusTablet::kIdentifierColName); + kMetadataColIdx = schema.find_column(TxnStatusTablet::kMetadataColName); return Status::OK(); }); } @@ -107,15 +102,25 @@ int MetadataColIdx() { return kMetadataColIdx; } +Schema kTxnStatusSchema; Schema kTxnStatusSchemaNoIds; // Populates the schema of the transaction status table. Status PopulateTxnStatusSchema(SchemaBuilder* builder) { - RETURN_NOT_OK(builder->AddKeyColumn(kTxnIdColName, INT64)); - RETURN_NOT_OK(builder->AddKeyColumn(kEntryTypeColName, INT8)); - RETURN_NOT_OK(builder->AddKeyColumn(kIdentifierColName, STRING)); - return builder->AddColumn(kMetadataColName, STRING); + RETURN_NOT_OK(builder->AddKeyColumn(TxnStatusTablet::kTxnIdColName, INT64)); + RETURN_NOT_OK(builder->AddKeyColumn(TxnStatusTablet::kEntryTypeColName, INT8)); + RETURN_NOT_OK(builder->AddKeyColumn(TxnStatusTablet::kIdentifierColName, STRING)); + return builder->AddColumn(TxnStatusTablet::kMetadataColName, STRING); } // Initializes the static transaction status schema. +Status InitTxnStatusSchemaOnce() { + static KuduOnceLambda schema_initializer; + return schema_initializer.Init([] { + SchemaBuilder builder; + RETURN_NOT_OK(PopulateTxnStatusSchema(&builder)); + kTxnStatusSchema = builder.Build(); + return Status::OK(); + }); +} Status InitTxnStatusSchemaWithNoIdsOnce() { static KuduOnceLambda schema_initializer; return schema_initializer.Init([] { @@ -171,27 +176,36 @@ Status ExtractMetadataEntry(const RowBlockRow& row, T* pb) { } Status PopulateTransactionEntryRow(int64_t txn_id, const faststring& entry, KuduPartialRow* row) { - RETURN_NOT_OK(row->SetInt64(kTxnIdColName, txn_id)); - RETURN_NOT_OK(row->SetInt8(kEntryTypeColName, TxnStatusTablet::TRANSACTION)); - RETURN_NOT_OK(row->SetString(kIdentifierColName, "")); - return row->SetString(kMetadataColName, entry); + RETURN_NOT_OK(row->SetInt64(TxnStatusTablet::kTxnIdColName, txn_id)); + RETURN_NOT_OK(row->SetInt8(TxnStatusTablet::kEntryTypeColName, TxnStatusTablet::TRANSACTION)); + RETURN_NOT_OK(row->SetString(TxnStatusTablet::kIdentifierColName, "")); + return row->SetString(TxnStatusTablet::kMetadataColName, entry); } Status PopulateParticipantEntryRow(int64_t txn_id, const string& tablet_id, const faststring& entry, KuduPartialRow* row) { - RETURN_NOT_OK(row->SetInt64(kTxnIdColName, txn_id)); - RETURN_NOT_OK(row->SetInt8(kEntryTypeColName, TxnStatusTablet::PARTICIPANT)); - RETURN_NOT_OK(row->SetString(kIdentifierColName, tablet_id)); - return row->SetString(kMetadataColName, entry); + RETURN_NOT_OK(row->SetInt64(TxnStatusTablet::kTxnIdColName, txn_id)); + RETURN_NOT_OK(row->SetInt8(TxnStatusTablet::kEntryTypeColName, TxnStatusTablet::PARTICIPANT)); + RETURN_NOT_OK(row->SetString(TxnStatusTablet::kIdentifierColName, tablet_id)); + return row->SetString(TxnStatusTablet::kMetadataColName, entry); } } // anonymous namespace +const char* const TxnStatusTablet::kTxnIdColName = "txn_id"; +const char* const TxnStatusTablet::kEntryTypeColName = "entry_type"; +const char* const TxnStatusTablet::kIdentifierColName = "identifier"; +const char* const TxnStatusTablet::kMetadataColName = "metadata"; + TxnStatusTablet::TxnStatusTablet(tablet::TabletReplica* tablet_replica) : tablet_replica_(DCHECK_NOTNULL(tablet_replica)) { CHECK_OK(InitTxnStatusColIdxs()); } +const Schema& TxnStatusTablet::GetSchema() { + CHECK_OK(InitTxnStatusSchemaOnce()); + return kTxnStatusSchema; +} const Schema& TxnStatusTablet::GetSchemaWithoutIds() { CHECK_OK(InitTxnStatusSchemaWithNoIdsOnce()); return kTxnStatusSchemaNoIds; diff --git a/src/kudu/transactions/txn_status_tablet.h b/src/kudu/transactions/txn_status_tablet.h index 1b9378f..db39ce2 100644 --- a/src/kudu/transactions/txn_status_tablet.h +++ b/src/kudu/transactions/txn_status_tablet.h @@ -82,6 +82,10 @@ class TransactionsVisitor { // TODO(awong): consider batching writes. class TxnStatusTablet { public: + static const char* const kTxnIdColName; + static const char* const kEntryTypeColName; + static const char* const kIdentifierColName; + static const char* const kMetadataColName; enum TxnStatusEntryType { TRANSACTION = 1, PARTICIPANT = 2, @@ -89,6 +93,7 @@ class TxnStatusTablet { explicit TxnStatusTablet(tablet::TabletReplica* tablet_replica); // Returns the schema of the transactions status table. + static const Schema& GetSchema(); static const Schema& GetSchemaWithoutIds(); // Uses the given visitor to iterate over the entries in the rows of the @@ -105,6 +110,11 @@ class TxnStatusTablet { const TxnParticipantEntryPB& pb); private: + friend class TxnStatusManager; + tablet::TabletReplica* tablet_replica() const { + return tablet_replica_; + } + // Writes 'req' to the underlying tablet replica, returning an error if there // was a problem replicating the request, or if there were any row errors. Status SyncWrite(const tserver::WriteRequestPB& req); diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt index c714547..40e1ac9 100644 --- a/src/kudu/tserver/CMakeLists.txt +++ b/src/kudu/tserver/CMakeLists.txt @@ -133,6 +133,7 @@ target_link_libraries(tserver server_process tablet tablet_copy_proto + transactions tserver_admin_proto tserver_proto tserver_service_proto) diff --git a/src/kudu/tserver/mini_tablet_server.cc b/src/kudu/tserver/mini_tablet_server.cc index bb9b782..8825e08 100644 --- a/src/kudu/tserver/mini_tablet_server.cc +++ b/src/kudu/tserver/mini_tablet_server.cc @@ -148,7 +148,7 @@ Status MiniTabletServer::AddTestTablet(const std::string& table_id, return server_->tablet_manager()->CreateNewTablet( table_id, tablet_id, partition.second, table_id, - schema_with_ids, partition.first, config, boost::none, boost::none, nullptr); + schema_with_ids, partition.first, config, boost::none, boost::none, boost::none, nullptr); } vector<string> MiniTabletServer::ListTablets() const { diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc index d5e4a2c..d783565 100644 --- a/src/kudu/tserver/tablet_copy_client.cc +++ b/src/kudu/tserver/tablet_copy_client.cc @@ -357,17 +357,21 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, // Create the superblock on disk. RETURN_NOT_OK(TabletMetadata::CreateNew(fs_manager_, tablet_id_, - superblock_->table_name(), - superblock_->table_id(), - schema, - partition_schema, - partition, - superblock_->tablet_data_state(), - superblock_->tombstone_last_logged_opid(), - remote_superblock_->supports_live_row_count(), - superblock_->extra_config(), - superblock_->dimension_label(), - &meta_)); + superblock_->table_name(), + superblock_->table_id(), + schema, + partition_schema, + partition, + superblock_->tablet_data_state(), + superblock_->tombstone_last_logged_opid(), + remote_superblock_->supports_live_row_count(), + superblock_->has_extra_config() ? + boost::make_optional(superblock_->extra_config()) : boost::none, + superblock_->has_dimension_label() ? + boost::make_optional(superblock_->dimension_label()) : boost::none, + superblock_->has_table_type() ? + boost::make_optional(superblock_->table_type()) : boost::none, + &meta_)); TRACE("Wrote new tablet metadata"); // We have begun persisting things to disk. Update the tablet copy state diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc index 27f3daa..45d327e 100644 --- a/src/kudu/tserver/tablet_copy_source_session-test.cc +++ b/src/kudu/tserver/tablet_copy_source_session-test.cc @@ -164,6 +164,7 @@ class TabletCopyTest : public KuduTabletTest { cmeta_manager, *config_peer, apply_pool_.get(), + nullptr, [this, tablet_id](const string& reason) { this->TabletReplicaStateChangedCallback(tablet_id, reason); })); diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 6c13f52..3e0c5c7 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -3837,7 +3837,7 @@ TEST_F(TabletServerTest, TestWriteOutOfBounds) { "TestWriteOutOfBoundsTable", tabletId, partitions[1], tabletId, schema, partition_schema, - mini_server_->CreateLocalConfig(), boost::none, boost::none, nullptr)); + mini_server_->CreateLocalConfig(), boost::none, boost::none, boost::none, nullptr)); ASSERT_OK(WaitForTabletRunning(tabletId)); diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 1625288..d22759d 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -1243,6 +1243,7 @@ void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req, req->config(), req->has_extra_config() ? boost::make_optional(req->extra_config()) : boost::none, req->has_dimension_label() ? boost::make_optional(req->dimension_label()) : boost::none, + req->has_table_type() ? boost::make_optional(req->table_type()) : boost::none, nullptr); if (PREDICT_FALSE(!s.ok())) { TabletServerErrorPB::Code code; diff --git a/src/kudu/tserver/ts_tablet_manager-test.cc b/src/kudu/tserver/ts_tablet_manager-test.cc index e82fd20..915e349 100644 --- a/src/kudu/tserver/ts_tablet_manager-test.cc +++ b/src/kudu/tserver/ts_tablet_manager-test.cc @@ -121,6 +121,7 @@ class TsTabletManagerTest : public KuduTest { config_, std::move(extra_config), std::move(dimension_label), + /*table_type*/boost::none, &tablet_replica)); if (out_tablet_replica) { (*out_tablet_replica) = tablet_replica; diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index 355a2b7..3b1127a 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -55,6 +55,7 @@ #include "kudu/tablet/tablet_bootstrap.h" #include "kudu/tablet/tablet_metadata.h" #include "kudu/tablet/tablet_replica.h" +#include "kudu/transactions/txn_status_manager.h" #include "kudu/tserver/heartbeater.h" #include "kudu/tserver/tablet_server.h" #include "kudu/util/debug/trace_event.h" @@ -220,6 +221,7 @@ using kudu::tablet::TABLET_DATA_TOMBSTONED; using kudu::tablet::TabletDataState; using kudu::tablet::TabletMetadata; using kudu::tablet::TabletReplica; +using kudu::transactions::TxnStatusManagerFactory; using kudu::tserver::TabletCopyClient; using std::make_shared; using std::set; @@ -451,6 +453,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id, RaftConfigPB config, boost::optional<TableExtraConfigPB> extra_config, boost::optional<string> dimension_label, + boost::optional<TableTypePB> table_type, scoped_refptr<TabletReplica>* replica) { CHECK_EQ(state(), MANAGER_RUNNING); CHECK(IsRaftConfigMember(server_->instance_pb().permanent_uuid(), config)); @@ -491,6 +494,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id, /*supports_live_row_count=*/ true, std::move(extra_config), std::move(dimension_label), + std::move(table_type), &meta), "Couldn't create tablet metadata"); @@ -826,12 +830,17 @@ Status TSTabletManager::CreateAndRegisterTabletReplica( scoped_refptr<TabletMetadata> meta, RegisterTabletReplicaMode mode, scoped_refptr<TabletReplica>* replica_out) { + // TODO(awong): this factory will at some point contain some tserver-wide + // state like a system client that can make calls to leader tablets. For now, + // just use a simple local factory. + TxnStatusManagerFactory tsm_factory; const auto& tablet_id = meta->tablet_id(); scoped_refptr<TabletReplica> replica( new TabletReplica(std::move(meta), cmeta_manager_, local_peer_pb_, server_->tablet_apply_pool(), + &tsm_factory, [this, tablet_id](const string& reason) { this->MarkTabletDirty(tablet_id, reason); })); diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h index 042f418..5ecc73f 100644 --- a/src/kudu/tserver/ts_tablet_manager.h +++ b/src/kudu/tserver/ts_tablet_manager.h @@ -28,6 +28,7 @@ #include <gtest/gtest_prod.h> +#include "kudu/common/common.pb.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" @@ -55,7 +56,6 @@ class NodeInstancePB; class Partition; class PartitionSchema; class Schema; -class TableExtraConfigPB; class ThreadPool; namespace consensus { @@ -126,6 +126,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf { consensus::RaftConfigPB config, boost::optional<TableExtraConfigPB> extra_config, boost::optional<std::string> dimension_label, + boost::optional<TableTypePB> table_type, scoped_refptr<tablet::TabletReplica>* replica); // Delete the specified tablet asynchronously with callback 'cb'. diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto index 8e8eb6b..51dfcf3 100644 --- a/src/kudu/tserver/tserver_admin.proto +++ b/src/kudu/tserver/tserver_admin.proto @@ -53,6 +53,10 @@ message CreateTabletRequestPB { // UUID of server this request is addressed to. optional bytes dest_uuid = 8; + // The type of table this tablet belongs to. If not set, the assumption is + // this is a user-defined table as opposed to a Kudu-internal system table. + optional TableTypePB table_type = 13; + required bytes table_id = 1; required bytes tablet_id = 2; // DEPRECATED.
