This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4b6d8917b79b9de53957174ade1a7ffc76e0090 Author: shenxingwuying <[email protected]> AuthorDate: Tue Jan 4 20:07:29 2022 +0800 KUDU-3197 [tserver] optimal Schema's memory used, using std::shared_ptr Change TabletMeta's variable Schema* to std::shared_ptr<Schema> to reduce memory used when alter schema. Because TabletMeta save old_schemas to reserve the elder schemas when alter schema, maybe they have been used by scanners or compaction jobs. As jira KUDU-3197 said, frequently alter schema will lead to tserver's memory becomes very large, just like memory leak, especially column's number is very large. The jira issued by wangningito, and I continue his work, and now use std::shared_ptr instead of scoped_refptr<Schema>, because scoped_refptr<Schema> causes too many changes, just as: https://gerrit.cloudera.org/c/18098/ Change-Id: Ic284dde108c49130419d876c6698b40c195e9b35 Reviewed-on: http://gerrit.cloudera.org:8080/18255 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/client/client-test.cc | 10 ++--- src/kudu/common/schema.h | 1 + src/kudu/integration-tests/linked_list-test-util.h | 2 +- src/kudu/master/sys_catalog.cc | 5 ++- src/kudu/tablet/all_types-scan-correctness-test.cc | 2 +- src/kudu/tablet/cfile_set.cc | 4 +- src/kudu/tablet/cfile_set.h | 2 +- src/kudu/tablet/diff_scan-test.cc | 6 +-- src/kudu/tablet/diskrowset.cc | 8 ++-- src/kudu/tablet/mt-tablet-test.cc | 2 +- src/kudu/tablet/ops/alter_schema_op.cc | 10 ++--- src/kudu/tablet/ops/alter_schema_op.h | 8 ++-- src/kudu/tablet/ops/op.h | 4 +- src/kudu/tablet/ops/write_op.h | 7 ++- src/kudu/tablet/rowset_metadata.h | 2 +- src/kudu/tablet/tablet-schema-test.cc | 10 ++--- src/kudu/tablet/tablet-test-util.h | 3 +- src/kudu/tablet/tablet.cc | 47 +++++++++++--------- src/kudu/tablet/tablet.h | 6 +-- src/kudu/tablet/tablet_bootstrap.cc | 8 ++-- src/kudu/tablet/tablet_metadata.cc | 51 +++++++++++----------- src/kudu/tablet/tablet_metadata.h | 29 +++++------- src/kudu/tablet/tablet_replica-test.cc | 10 ++--- src/kudu/tools/kudu-tool-test.cc | 8 ++-- src/kudu/tools/tool_action_fs.cc | 8 ++-- src/kudu/tools/tool_action_local_replica.cc | 15 ++++--- src/kudu/tserver/scanners.cc | 5 ++- src/kudu/tserver/tablet_server-test.cc | 2 +- src/kudu/tserver/tablet_service.cc | 23 ++++++---- src/kudu/tserver/tserver_path_handlers.cc | 12 ++--- 30 files changed, 161 insertions(+), 149 deletions(-) diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 94ec37d..75b2bce 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -4709,7 +4709,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) { ->Default(KuduValue::CopyString("hello!")); ASSERT_OK(table_alterer->Alter()); ASSERT_EQ(4, tablet_replica->tablet()->metadata()->schema_version()); - Schema schema = tablet_replica->tablet()->metadata()->schema(); + Schema schema = *tablet_replica->tablet()->metadata()->schema(); ColumnSchema col_schema = schema.column(schema.find_column("string_val")); ASSERT_FALSE(col_schema.has_read_default()); ASSERT_TRUE(col_schema.has_write_default()); @@ -4723,7 +4723,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) { ->Default(KuduValue::FromInt(54321)); ASSERT_OK(table_alterer->Alter()); ASSERT_EQ(5, tablet_replica->tablet()->metadata()->schema_version()); - Schema schema = tablet_replica->tablet()->metadata()->schema(); + Schema schema = *tablet_replica->tablet()->metadata()->schema(); ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default")); ASSERT_TRUE(col_schema.has_read_default()); // Started with a default ASSERT_TRUE(col_schema.has_write_default()); @@ -4737,7 +4737,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) { ->RemoveDefault(); ASSERT_OK(table_alterer->Alter()); ASSERT_EQ(6, tablet_replica->tablet()->metadata()->schema_version()); - Schema schema = tablet_replica->tablet()->metadata()->schema(); + Schema schema = *tablet_replica->tablet()->metadata()->schema(); ColumnSchema col_schema = schema.column(schema.find_column("string_val")); ASSERT_FALSE(col_schema.has_read_default()); ASSERT_FALSE(col_schema.has_write_default()); @@ -4750,7 +4750,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) { ->RemoveDefault(); ASSERT_OK(table_alterer->Alter()); ASSERT_EQ(7, tablet_replica->tablet()->metadata()->schema_version()); - Schema schema = tablet_replica->tablet()->metadata()->schema(); + Schema schema = *tablet_replica->tablet()->metadata()->schema(); ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default")); ASSERT_TRUE(col_schema.has_read_default()); ASSERT_FALSE(col_schema.has_write_default()); @@ -4779,7 +4779,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) { ->BlockSize(16 * 1024 * 1024); ASSERT_OK(table_alterer->Alter()); ASSERT_EQ(8, tablet_replica->tablet()->metadata()->schema_version()); - Schema schema = tablet_replica->tablet()->metadata()->schema(); + Schema schema = *tablet_replica->tablet()->metadata()->schema(); ColumnSchema col_schema = schema.column(schema.find_column("string_val")); ASSERT_EQ(KuduColumnStorageAttributes::PLAIN_ENCODING, col_schema.attributes().encoding); ASSERT_EQ(KuduColumnStorageAttributes::LZ4, col_schema.attributes().compression); diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h index 725a0e8..6662319 100644 --- a/src/kudu/common/schema.h +++ b/src/kudu/common/schema.h @@ -48,6 +48,7 @@ namespace kudu { class Schema; +typedef std::shared_ptr<Schema> SchemaPtr; } // namespace kudu // Check that two schemas are equal, yielding a useful error message diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h index 8e395df..89eae43 100644 --- a/src/kudu/integration-tests/linked_list-test-util.h +++ b/src/kudu/integration-tests/linked_list-test-util.h @@ -564,7 +564,7 @@ inline Status LinkedListTester::VerifyLinkedListLocal( GenerateSplitInts()); verifier.StartScanTimer(); - const Schema* tablet_schema = tablet->schema(); + const Schema* tablet_schema = tablet->schema().get(); // Cannot use schemas with col indexes in a scan (assertions fire). Schema projection(tablet_schema->columns(), tablet_schema->num_key_columns()); std::unique_ptr<RowwiseIterator> iter; diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index 0d417a5..5b61cd7 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -252,9 +252,10 @@ Status SysCatalogTable::Load(FsManager *fs_manager) { RETURN_NOT_OK(tablet::TabletMetadata::Load(fs_manager, kSysCatalogTabletId, &metadata)); // Verify that the schema is the current one - if (metadata->schema() != BuildTableSchema()) { + const SchemaPtr schema_ptr = metadata->schema(); + if (*schema_ptr != BuildTableSchema()) { // TODO: In this case we probably should execute the migration step. - return(Status::Corruption("Unexpected schema", metadata->schema().ToString())); + return(Status::Corruption("Unexpected schema", schema_ptr->ToString())); } LOG(INFO) << "Verifying existing consensus state"; diff --git a/src/kudu/tablet/all_types-scan-correctness-test.cc b/src/kudu/tablet/all_types-scan-correctness-test.cc index 7dfa1d6..5340e04 100644 --- a/src/kudu/tablet/all_types-scan-correctness-test.cc +++ b/src/kudu/tablet/all_types-scan-correctness-test.cc @@ -363,7 +363,7 @@ public: } else { default_ptr = rowops_.GenerateElement(read_default); } - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); builder.RemoveColumn("val_c"); ASSERT_OK(builder.AddColumn("val_c", rowops_.type_, true, default_ptr, nullptr)); AlterSchema(builder.Build()); diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc index f9899e4..a9efaf0 100644 --- a/src/kudu/tablet/cfile_set.cc +++ b/src/kudu/tablet/cfile_set.cc @@ -220,7 +220,7 @@ CFileReader* CFileSet::key_index_reader() const { // If there is no special index cfile, then we have a non-compound key // and we can just use the key column. // This is always the first column listed in the tablet schema. - int key_col_id = tablet_schema().column_id(0); + int key_col_id = tablet_schema()->column_id(0); return FindOrDie(readers_by_col_id_, key_col_id).get(); } @@ -423,7 +423,7 @@ Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec *spec) { Schema key_schema_for_vlog; if (VLOG_IS_ON(1)) { - key_schema_for_vlog = base_data_->tablet_schema().CreateKeyProjection(); + key_schema_for_vlog = base_data_->tablet_schema()->CreateKeyProjection(); } const auto* lb_key = spec->lower_bound_key(); diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h index 1b06df0..34eb4b3 100644 --- a/src/kudu/tablet/cfile_set.h +++ b/src/kudu/tablet/cfile_set.h @@ -156,7 +156,7 @@ class CFileSet : // (the ad-hoc reader for composite keys, otherwise the key column reader) cfile::CFileReader* key_index_reader() const; - const Schema &tablet_schema() const { return rowset_metadata_->tablet_schema(); } + const SchemaPtr tablet_schema() const { return rowset_metadata_->tablet_schema(); } std::shared_ptr<RowSetMetadata> rowset_metadata_; std::shared_ptr<MemTracker> bloomfile_tracker_; diff --git a/src/kudu/tablet/diff_scan-test.cc b/src/kudu/tablet/diff_scan-test.cc index cecc367..6dded93 100644 --- a/src/kudu/tablet/diff_scan-test.cc +++ b/src/kudu/tablet/diff_scan-test.cc @@ -104,7 +104,7 @@ TEST_P(DiffScanTest, TestDiffScan) { opts.include_deleted_rows = include_deleted_rows; static const bool kIsDeletedDefault = false; - SchemaBuilder builder(tablet->metadata()->schema()); + SchemaBuilder builder(*tablet->metadata()->schema()); if (order_mode == ORDERED) { // Define our diff scan to start from snap1. // NOTE: it isn't critical to set this given the default is -Inf, but it @@ -188,7 +188,7 @@ TEST_F(OrderedDiffScanWithDeletesTest, TestKudu3108) { opts.order = ORDERED; opts.include_deleted_rows = true; static const bool kIsDeletedDefault = false; - SchemaBuilder builder(tablet->metadata()->schema()); + SchemaBuilder builder(*tablet->metadata()->schema()); ASSERT_OK(builder.AddColumn("deleted", IS_DELETED, /*is_nullable=*/ false, /*read_default=*/ &kIsDeletedDefault, @@ -247,7 +247,7 @@ TEST_F(OrderedDiffScanWithDeletesTest, TestDiffScanAfterDeltaFlushRacesWithBatch opts.snap_to_exclude = snap1; opts.snap_to_include = snap2; opts.order = ORDERED;; - SchemaBuilder builder(tablet->metadata()->schema()); + SchemaBuilder builder(*tablet->metadata()->schema()); Schema projection = builder.BuildWithoutIds(); opts.projection = &projection; diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc index da2ae7a..13c6d06 100644 --- a/src/kudu/tablet/diskrowset.cc +++ b/src/kudu/tablet/diskrowset.cc @@ -630,10 +630,10 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids, DCHECK(open_); shared_lock<rw_spinlock> l(component_lock_); - const Schema* schema = &rowset_metadata_->tablet_schema(); + const SchemaPtr schema_ptr = rowset_metadata_->tablet_schema(); RowIteratorOptions opts; - opts.projection = schema; + opts.projection = schema_ptr.get(); opts.io_context = io_context; vector<shared_ptr<DeltaStore>> included_stores; unique_ptr<DeltaIterator> delta_iter; @@ -641,7 +641,7 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids, opts, REDO, &included_stores, &delta_iter)); out->reset(new MajorDeltaCompaction(rowset_metadata_->fs_manager(), - *schema, + *schema_ptr, base_data_.get(), std::move(delta_iter), std::move(included_stores), @@ -910,7 +910,7 @@ Status DiskRowSet::DebugDump(vector<string> *lines) { // Using CompactionInput to dump our data is an easy way of seeing all the // rows and deltas. unique_ptr<CompactionInput> input; - RETURN_NOT_OK(NewCompactionInput(&rowset_metadata_->tablet_schema(), + RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(), MvccSnapshot::CreateSnapshotIncludingAllOps(), nullptr, &input)); return DebugDumpCompactionInput(input.get(), lines); diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc index 937f4a0..05f29b3 100644 --- a/src/kudu/tablet/mt-tablet-test.cc +++ b/src/kudu/tablet/mt-tablet-test.cc @@ -107,7 +107,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter)); uint64_t count; CHECK_OK(tablet()->CountRows(&count)); - const Schema* schema = tablet()->schema(); + const Schema* schema = tablet()->schema().get(); ColumnSchema valcol = schema->column(schema->find_column("val")); valcol_projection_ = Schema({ valcol }, 0); CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter)); diff --git a/src/kudu/tablet/ops/alter_schema_op.cc b/src/kudu/tablet/ops/alter_schema_op.cc index 1c73ea2..5e386d3 100644 --- a/src/kudu/tablet/ops/alter_schema_op.cc +++ b/src/kudu/tablet/ops/alter_schema_op.cc @@ -96,17 +96,17 @@ Status AlterSchemaOp::Prepare() { TRACE("PREPARE ALTER-SCHEMA: Starting"); // Decode schema - unique_ptr<Schema> schema(new Schema); - Status s = SchemaFromPB(state_->request()->schema(), schema.get()); + SchemaPtr schema_ptr = std::make_shared<Schema>(); + Status s = SchemaFromPB(state_->request()->schema(), schema_ptr.get()); if (!s.ok()) { state_->completion_callback()->set_error(s, TabletServerErrorPB::INVALID_SCHEMA); return s; } Tablet* tablet = state_->tablet_replica()->tablet(); - RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema.get())); + RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema_ptr)); - state_->AddToAutoReleasePool(std::move(schema)); + state_->AddToAutoReleasePool(std::move(schema_ptr)); TRACE("PREPARE ALTER-SCHEMA: finished"); return s; @@ -138,7 +138,7 @@ Status AlterSchemaOp::Apply(CommitMsg** commit_msg) { } state_->tablet_replica()->log() - ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()), + ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema().get()), state_->schema_version()); // Altered tablets should be included in the next tserver heartbeat so that diff --git a/src/kudu/tablet/ops/alter_schema_op.h b/src/kudu/tablet/ops/alter_schema_op.h index 9bccd2a..022544e 100644 --- a/src/kudu/tablet/ops/alter_schema_op.h +++ b/src/kudu/tablet/ops/alter_schema_op.h @@ -24,6 +24,7 @@ #include <boost/optional/optional.hpp> #include "kudu/common/common.pb.h" +#include "kudu/common/schema.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/gutil/macros.h" #include "kudu/tablet/ops/op.h" @@ -32,7 +33,6 @@ namespace kudu { -class Schema; class rw_semaphore; namespace tablet { @@ -59,8 +59,8 @@ class AlterSchemaOpState : public OpState { const tserver::AlterSchemaRequestPB* request() const override { return request_; } tserver::AlterSchemaResponsePB* response() const override { return response_; } - void set_schema(const Schema* schema) { schema_ = schema; } - const Schema* schema() const { return schema_; } + void set_schema(const SchemaPtr& schema) { schema_ = schema; } + const SchemaPtr schema() const { return schema_; } std::string new_table_name() const { return request_->new_table_name(); @@ -109,7 +109,7 @@ class AlterSchemaOpState : public OpState { DISALLOW_COPY_AND_ASSIGN(AlterSchemaOpState); // The new (target) Schema. - const Schema* schema_; + SchemaPtr schema_; // The original RPC request and response. const tserver::AlterSchemaRequestPB *request_; diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h index c94837c..024b53d 100644 --- a/src/kudu/tablet/ops/op.h +++ b/src/kudu/tablet/ops/op.h @@ -209,7 +209,7 @@ class OpState { } // Sets a heap object to be managed by this op's AutoReleasePool. - void AddToAutoReleasePool(std::unique_ptr<Schema> t) { + void AddToAutoReleasePool(SchemaPtr t) { schemas_pool_.emplace_back(std::move(t)); } @@ -283,7 +283,7 @@ class OpState { // Optional callback to be called once the op completes. std::unique_ptr<OpCompletionCallback> completion_clbk_; - std::deque<std::unique_ptr<Schema>> schemas_pool_; + std::deque<SchemaPtr> schemas_pool_; // This operation's timestamp. // This is only set once during the operation lifecycle, using external synchronization. diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h index 8c14176..4751d39 100644 --- a/src/kudu/tablet/ops/write_op.h +++ b/src/kudu/tablet/ops/write_op.h @@ -193,9 +193,10 @@ class WriteOpState : public OpState { void ReleaseMvccTxn(Op::OpResult result); - void set_schema_at_decode_time(const Schema* schema) { + void set_schema_at_decode_time(const SchemaPtr& schema) { std::lock_guard<simple_spinlock> l(op_state_lock_); - schema_at_decode_time_ = schema; + schema_ptr_at_decode_time_ = schema; + schema_at_decode_time_ = schema.get(); } const Schema* schema_at_decode_time() const { @@ -319,6 +320,8 @@ class WriteOpState : public OpState { // at APPLY time to ensure we don't have races against schema change. // Protected by op_state_lock_. const Schema* schema_at_decode_time_; + // protect schema_at_decode_time_ + SchemaPtr schema_ptr_at_decode_time_; // Lock that protects access to various fields of WriteOpState. mutable simple_spinlock op_state_lock_; diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h index c4be77d..70a6b1f 100644 --- a/src/kudu/tablet/rowset_metadata.h +++ b/src/kudu/tablet/rowset_metadata.h @@ -99,7 +99,7 @@ class RowSetMetadata { int64_t id() const { return id_; } - const Schema& tablet_schema() const { + const SchemaPtr tablet_schema() const { return tablet_metadata_->schema(); } diff --git a/src/kudu/tablet/tablet-schema-test.cc b/src/kudu/tablet/tablet-schema-test.cc index de26638..1886f81 100644 --- a/src/kudu/tablet/tablet-schema-test.cc +++ b/src/kudu/tablet/tablet-schema-test.cc @@ -146,7 +146,7 @@ TEST_F(TestTabletSchema, TestWrite) { const int32_t c2_write_default = 5; const int32_t c2_read_default = 7; - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default)); AlterSchema(builder.Build()); Schema s2 = builder.BuildWithoutIds(); @@ -189,7 +189,7 @@ TEST_F(TestTabletSchema, TestReInsert) { const int32_t c2_write_default = 5; const int32_t c2_read_default = 7; - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default)); AlterSchema(builder.Build()); Schema s2 = builder.BuildWithoutIds(); @@ -219,7 +219,7 @@ TEST_F(TestTabletSchema, TestRenameProjection) { InsertRow(client_schema_, 1); // Switch schema to s2 - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.RenameColumn("c1", "c1_renamed")); AlterSchema(builder.Build()); Schema s2 = builder.BuildWithoutIds(); @@ -260,7 +260,7 @@ TEST_F(TestTabletSchema, TestDeleteAndReAddColumn) { VerifyTabletRows(client_schema_, keys); // Switch schema to s2 - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.RemoveColumn("c1")); // NOTE this new 'c1' will have a different id from the previous one // so the data added to the previous 'c1' will not be visible. @@ -279,7 +279,7 @@ TEST_F(TestTabletSchema, TestModifyEmptyMemRowSet) { std::vector<std::pair<string, string> > keys; // Switch schema to s2 - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.AddNullableColumn("c2", INT32)); AlterSchema(builder.Build()); Schema s2 = builder.BuildWithoutIds(); diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h index 1ca5a1a..bf010cb 100644 --- a/src/kudu/tablet/tablet-test-util.h +++ b/src/kudu/tablet/tablet-test-util.h @@ -150,8 +150,9 @@ class KuduTabletTest : public KuduTest { *(req.mutable_new_extra_config()) = *extra_config; } + SchemaPtr schema_ptr = std::make_shared<Schema>(schema); AlterSchemaOpState op_state(nullptr, &req, nullptr); - ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, &schema)); + ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, schema_ptr)); ASSERT_OK(tablet()->AlterSchema(&op_state)); op_state.Finish(); } diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 28bb0cf..50eda43 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -262,7 +262,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata, shared_ptr<MemTracker> parent_mem_tracker, MetricRegistry* metric_registry, scoped_refptr<LogAnchorRegistry> log_anchor_registry) - : key_schema_(metadata->schema().CreateKeyProjection()), + : key_schema_(metadata->schema()->CreateKeyProjection()), metadata_(std::move(metadata)), log_anchor_registry_(std::move(log_anchor_registry)), mem_trackers_(tablet_id(), std::move(parent_mem_tracker)), @@ -371,7 +371,8 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids, // Now that the current state is loaded, create the new MemRowSet with the next id. shared_ptr<MemRowSet> new_mrs; - RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(), + const SchemaPtr schema_ptr = schema(); + RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr, log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &new_mrs)); @@ -386,7 +387,7 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids, // NOTE: we are able to FindOrDie() on these IDs because // 'txn_ids_with_mrs' is a subset of the transaction IDs known by the // metadata. - RETURN_NOT_OK(MemRowSet::Create(0, *schema(), txn_id, FindOrDie(txn_meta_by_id, txn_id), + RETURN_NOT_OK(MemRowSet::Create(0, *schema_ptr, txn_id, FindOrDie(txn_meta_by_id, txn_id), log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &txn_mrs)); @@ -462,7 +463,7 @@ void Tablet::Shutdown() { Status Tablet::GetMappedReadProjection(const Schema& projection, Schema *mapped_projection) const { - const Schema* cur_schema = schema(); + const SchemaPtr cur_schema = schema(); return cur_schema->GetMappedReadProjection(projection, mapped_projection); } @@ -541,17 +542,18 @@ Status Tablet::DecodeWriteOperations(const Schema* client_schema, TRACE("Decoding operations"); vector<DecodedRowOperation> ops; + SchemaPtr schema_ptr = schema(); // Decode the ops RowOperationsPBDecoder dec(&op_state->request()->row_operations(), client_schema, - schema(), + schema_ptr.get(), op_state->arena()); RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops)); TRACE_COUNTER_INCREMENT("num_ops", ops.size()); // Important to set the schema before the ops -- we need the // schema in order to stringify the ops. - op_state->set_schema_at_decode_time(schema()); + op_state->set_schema_at_decode_time(schema_ptr); op_state->SetRowOps(std::move(ops)); return Status::OK(); @@ -605,8 +607,8 @@ Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const { const auto& ps = metadata_->partition_schema(); if (PREDICT_FALSE(!ps.PartitionContainsRow(metadata_->partition(), row))) { return Status::NotFound( - Substitute("Row not in tablet partition. Partition: '$0' row: '$1'", - ps.PartitionDebugString(metadata_->partition(), *schema()), + Substitute("Row not in tablet partition. Partition: '$0', row: '$1'.", + ps.PartitionDebugString(metadata_->partition(), *schema().get()), ps.PartitionKeyDebugString(row))); } return Status::OK(); @@ -744,7 +746,7 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context, } Timestamp ts = op_state->timestamp(); - ConstContiguousRow row(schema(), op->decoded_op.row_data); + ConstContiguousRow row(schema().get(), op->decoded_op.row_data); // TODO(todd): the Insert() call below will re-encode the key, which is a // waste. Should pass through the KeyProbe structure perhaps. @@ -819,7 +821,7 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* io_context, RowOp* upsert, RowSet* rowset, ProbeStats* stats) { - const auto* schema = this->schema(); + const auto* schema = this->schema().get(); ConstContiguousRow row(schema, upsert->decoded_op.row_data); faststring buf; RowChangeListEncoder enc(&buf); @@ -991,7 +993,8 @@ void Tablet::StartApplying(ParticipantOpState* op_state) { void Tablet::CreateTxnRowSets(int64_t txn_id, scoped_refptr<TxnMetadata> txn_meta) { shared_ptr<MemRowSet> new_mrs; - CHECK_OK(MemRowSet::Create(0, *schema(), txn_id, std::move(txn_meta), + const SchemaPtr schema_ptr = schema(); + CHECK_OK(MemRowSet::Create(0, *schema_ptr, txn_id, std::move(txn_meta), log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &new_mrs)); @@ -1266,7 +1269,7 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context, } DCHECK(op_state != nullptr) << "must have a WriteOpState"; DCHECK(op_state->op_id().IsInitialized()) << "OpState OpId needed for anchoring"; - DCHECK_EQ(op_state->schema_at_decode_time(), schema()); + DCHECK_EQ(op_state->schema_at_decode_time(), schema().get()); // If we were unable to check rowset presence in batch (e.g. because we are processing // a batch which contains some duplicate keys) we need to do so now. @@ -1524,7 +1527,8 @@ Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction, } shared_ptr<MemRowSet> new_mrs; - RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(), + const SchemaPtr schema_ptr = schema(); + RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr, log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &new_mrs)); @@ -1539,7 +1543,7 @@ Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction, } Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state, - const Schema* schema) { + const SchemaPtr& schema) { if (!schema->has_column_ids()) { // this probably means that the request is not from the Master @@ -1556,7 +1560,7 @@ Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state, } Status Tablet::AlterSchema(AlterSchemaOpState* op_state) { - DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema()))) + DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema().get()))) << "Schema keys cannot be altered(except name)"; // Prevent any concurrent flushes. Otherwise, we run into issues where @@ -1580,7 +1584,7 @@ Status Tablet::AlterSchema(AlterSchemaOpState* op_state) { << " to " << op_state->schema()->ToString() << " version " << op_state->schema_version(); DCHECK(schema_lock_.is_locked()); - metadata_->SetSchema(*op_state->schema(), op_state->schema_version()); + metadata_->SetSchema(op_state->schema(), op_state->schema_version()); if (op_state->has_new_table_name()) { metadata_->SetTableName(op_state->new_table_name()); if (metric_entity_) { @@ -1609,7 +1613,8 @@ Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema, // to flush. VLOG_WITH_PREFIX(1) << "Rewinding schema during bootstrap to " << new_schema.ToString(); - metadata_->SetSchema(new_schema, schema_version); + SchemaPtr schema = std::make_shared<Schema>(new_schema); + metadata_->SetSchema(schema, schema_version); { std::lock_guard<rw_spinlock> lock(component_lock_); @@ -1617,7 +1622,7 @@ Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema, shared_ptr<RowSetTree> old_rowsets = components_->rowsets; CHECK(old_mrs->empty()); shared_ptr<MemRowSet> new_mrs; - RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), new_schema, + RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), *schema, log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &new_mrs)); @@ -1872,7 +1877,8 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input, } shared_ptr<CompactionInput> merge; - RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema(), &io_context, &merge)); + const SchemaPtr schema_ptr = schema(); + RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema_ptr.get(), &io_context, &merge)); RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(), compaction_policy_->target_rowset_size()); @@ -2014,8 +2020,9 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input, VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates " "which arrived during Phase 1. Snapshot: $1", op_name, non_duplicated_ops_snap.ToString()); + const SchemaPtr schema_ptr2 = schema(); RETURN_NOT_OK_PREPEND( - input.CreateCompactionInput(non_duplicated_ops_snap, schema(), &io_context, &merge), + input.CreateCompactionInput(non_duplicated_ops_snap, schema_ptr2.get(), &io_context, &merge), Substitute("Failed to create $0 inputs", op_name).c_str()); // Update the output rowsets with the deltas that came in in phase 1, before we swapped diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 4d72cb9..6298c6c 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -261,7 +261,7 @@ class Tablet { // An error will be returned if the specified schema is invalid (e.g. // key mismatch, or missing IDs) Status CreatePreparedAlterSchema(AlterSchemaOpState *op_state, - const Schema* schema); + const SchemaPtr& schema); // Apply the Schema of the specified op. // This operation will trigger a flush on the current MemRowSet. @@ -411,8 +411,8 @@ class Tablet { // has a very small number of rows. Status DebugDump(std::vector<std::string> *lines = NULL); - const Schema* schema() const { - return &metadata_->schema(); + const SchemaPtr schema() const { + return metadata_->schema(); } // Returns a reference to the key projection of the tablet schema. diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index 7ccedfe..d0922a8 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -1535,11 +1535,11 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/, AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request(); // Decode schema - Schema schema; - RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema)); + SchemaPtr schema_ptr = std::make_shared<Schema>(); + RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), schema_ptr.get())); AlterSchemaOpState op_state(nullptr, alter_schema, nullptr); - RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, &schema)); + RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, schema_ptr)); // Apply the alter schema to the tablet RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&op_state), "Failed to AlterSchema:"); @@ -1547,7 +1547,7 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/, if (!op_state.error()) { // If the alter completed successfully, update the log segment header. Note // that our new log isn't hooked up to the tablet yet. - log_->SetSchemaForNextLogSegment(std::move(schema), op_state.schema_version()); + log_->SetSchemaForNextLogSegment(*schema_ptr, op_state.schema_version()); } return AppendCommitMsg(commit_msg); diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index 0a36f1c..1c949cb 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -167,9 +167,10 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager, scoped_refptr<TabletMetadata>* metadata) { Status s = Load(fs_manager, tablet_id, metadata); if (s.ok()) { - if ((*metadata)->schema() != schema) { + const SchemaPtr schema_ptr = (*metadata)->schema(); + if (*schema_ptr != schema) { return Status::Corruption(Substitute("Schema on disk ($0) does not " - "match expected schema ($1)", (*metadata)->schema().ToString(), + "match expected schema ($1)", schema_ptr->ToString(), schema.ToString())); } return Status::OK(); @@ -312,8 +313,7 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id, log_prefix_(Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid())), next_rowset_idx_(0), last_durable_mrs_id_(kNoDurableMemStore), - schema_(new Schema(schema)), - schema_ptr_(schema_.get()), + schema_(std::make_shared<Schema>(schema)), schema_version_(0), table_name_(std::move(table_name)), partition_schema_(std::move(partition_schema)), @@ -339,7 +339,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id) tablet_id_(std::move(tablet_id)), fs_manager_(fs_manager), next_rowset_idx_(0), - schema_ptr_(nullptr), num_flush_pins_(0), needs_flush_(false), flush_count_for_tests_(0), @@ -390,11 +389,14 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) table_name_ = superblock.table_name(); uint32_t schema_version = superblock.schema_version(); - unique_ptr<Schema> schema(new Schema()); + SchemaPtr schema = std::make_shared<Schema>(); RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock.schema(), schema.get()), "Failed to parse Schema from superblock " + SecureShortDebugString(superblock)); - SetSchemaUnlocked(std::move(schema), schema_version); + { + SchemaPtr old_schema; + SwapSchemaUnlocked(schema, schema_version, &old_schema); + } if (!superblock.has_partition()) { // KUDU-818: Possible backward compatibility issue with tables created @@ -418,7 +420,7 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) CHECK_EQ(table_id_, superblock.table_id()); PartitionSchema partition_schema; RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(), - *schema_ptr_, &partition_schema)); + *schema_, &partition_schema)); CHECK(partition_schema_ == partition_schema); Partition partition; @@ -743,7 +745,7 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, partition_.ToPB(pb.mutable_partition()); pb.set_last_durable_mrs_id(last_durable_mrs_id_); pb.set_schema_version(schema_version_); - RETURN_NOT_OK(partition_schema_.ToPB(*schema_ptr_, pb.mutable_partition_schema())); + RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema())); pb.set_table_name(table_name_); for (const shared_ptr<RowSetMetadata>& meta : rowsets) { @@ -756,8 +758,8 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb); } - DCHECK(schema_ptr_->has_column_ids()); - RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_ptr_, pb.mutable_schema()), + DCHECK(schema_->has_column_ids()); + RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_, pb.mutable_schema()), "Couldn't serialize schema into superblock"); pb.set_tablet_data_state(tablet_data_state_); @@ -937,22 +939,21 @@ RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) { return nullptr; } -void TabletMetadata::SetSchema(const Schema& schema, uint32_t version) { - unique_ptr<Schema> new_schema(new Schema(schema)); - std::lock_guard<LockType> l(data_lock_); - SetSchemaUnlocked(std::move(new_schema), version); +void TabletMetadata::SetSchema(const SchemaPtr& schema, uint32_t version) { + // In case this is the last reference to the schema, destruct the pointer + // outside the lock. + SchemaPtr old_schema; + { + std::lock_guard<LockType> l(data_lock_); + SwapSchemaUnlocked(schema, version, &old_schema); + } } -void TabletMetadata::SetSchemaUnlocked( - unique_ptr<Schema> new_schema, uint32_t version) { - DCHECK(new_schema->has_column_ids()); - - // "Release" barrier ensures that, when we publish the new Schema object, - // all of its initialization is also visible. - base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&schema_ptr_), - reinterpret_cast<AtomicWord>(new_schema.get())); - std::swap(schema_, new_schema); - old_schemas_.emplace_back(std::move(new_schema)); +void TabletMetadata::SwapSchemaUnlocked(SchemaPtr schema, uint32_t version, + SchemaPtr* old_schema) { + DCHECK(schema->has_column_ids()); + *old_schema = std::move(schema_); + schema_ = std::move(schema); schema_version_ = version; } diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index f545dfa..cda5c85 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -19,6 +19,7 @@ #include <atomic> #include <cstdint> #include <memory> +#include <mutex> #include <string> #include <unordered_map> #include <unordered_set> @@ -29,6 +30,7 @@ #include "kudu/common/common.pb.h" #include "kudu/common/partition.h" +#include "kudu/common/schema.h" #include "kudu/fs/block_id.h" #include "kudu/gutil/atomicops.h" #include "kudu/gutil/macros.h" @@ -43,7 +45,6 @@ namespace kudu { class BlockIdPB; class FsManager; -class Schema; class Timestamp; namespace consensus { @@ -58,6 +59,7 @@ namespace tablet { class RowSetMetadata; class TxnMetadata; + enum TxnState : int8_t; typedef std::vector<std::shared_ptr<RowSetMetadata>> RowSetMetadataVector; @@ -150,19 +152,18 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { uint32_t schema_version() const; - void SetSchema(const Schema& schema, uint32_t version); + void SetSchema(const SchemaPtr& schema, uint32_t version); void SetTableName(const std::string& table_name); void SetExtraConfig(TableExtraConfigPB extra_config); - // Return a reference to the current schema. + // Return a scoped_refptr to the current schema. // This pointer will be valid until the TabletMetadata is destructed, // even if the schema is changed. - const Schema& schema() const { - const Schema* s = reinterpret_cast<const Schema*>( - base::subtle::Acquire_Load(reinterpret_cast<const AtomicWord*>(&schema_ptr_))); - return *s; + const SchemaPtr schema() const { + std::lock_guard<LockType> l(data_lock_); + return schema_; } // Returns the partition schema of the tablet's table. @@ -375,7 +376,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { // Constructor for loading an existing tablet. TabletMetadata(FsManager* fs_manager, std::string tablet_id); - void SetSchemaUnlocked(std::unique_ptr<Schema> schema, uint32_t version); + void SwapSchemaUnlocked(SchemaPtr schema, uint32_t version, SchemaPtr* old_schema); Status LoadFromDisk(); @@ -443,21 +444,11 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metadata_by_txn_id_;; // The current schema version. - std::unique_ptr<Schema> schema_; - // The raw pointer to the schema for an atomic swap. - Schema* schema_ptr_; - + SchemaPtr schema_; uint32_t schema_version_; std::string table_name_; PartitionSchema partition_schema_; - // Previous values of 'schema_'. - // These are currently kept alive forever, under the assumption that - // a given tablet won't have thousands of "alter table" calls. - // They are kept alive so that callers of schema() don't need to - // worry about reference counting or locking. - std::vector<std::unique_ptr<Schema>> old_schemas_; - // Protected by 'data_lock_'. BlockIdSet orphaned_blocks_; diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc index db077bd..e6980a9 100644 --- a/src/kudu/tablet/tablet_replica-test.cc +++ b/src/kudu/tablet/tablet_replica-test.cc @@ -32,9 +32,9 @@ #include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" #include "kudu/common/row_operations.h" +#include "kudu/common/row_operations.pb.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.h" -#include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/log.h" #include "kudu/consensus/log_anchor_registry.h" @@ -540,11 +540,11 @@ TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) { ConsensusBootstrapInfo info; ASSERT_OK(StartReplicaAndWaitUntilLeader(info)); SchemaPB orig_schema_pb; - ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb)); + ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb)); const int orig_schema_version = tablet()->metadata()->schema_version(); // Add a new column. - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.AddColumn("new_col", INT32)); Schema new_client_schema = builder.BuildWithoutIds(); SchemaPB new_schema; @@ -581,11 +581,11 @@ TEST_F(TabletReplicaTest, Kudu2690Test) { ConsensusBootstrapInfo info; ASSERT_OK(StartReplicaAndWaitUntilLeader(info)); SchemaPB orig_schema_pb; - ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb)); + ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb)); const int orig_schema_version = tablet()->metadata()->schema_version(); // First things first, add a new column. - SchemaBuilder builder(tablet()->metadata()->schema()); + SchemaBuilder builder(*tablet()->metadata()->schema()); ASSERT_OK(builder.AddColumn("new_col", INT32)); Schema new_client_schema = builder.BuildWithoutIds(); SchemaPB new_schema; diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 2383b2f..1b73a3b 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -2431,7 +2431,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) { // Verify the contents of the metadata output SCOPED_TRACE(stdout); string debug_str = meta->partition_schema() - .PartitionDebugString(meta->partition(), meta->schema()); + .PartitionDebugString(meta->partition(), *meta->schema()); StripWhiteSpace(&debug_str); ASSERT_STR_CONTAINS(stdout, debug_str); debug_str = Substitute("Table name: $0 Table id: $1", @@ -2439,7 +2439,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) { ASSERT_STR_CONTAINS(stdout, debug_str); debug_str = Substitute("Schema (version=$0):", meta->schema_version()); ASSERT_STR_CONTAINS(stdout, debug_str); - debug_str = meta->schema().ToString(); + debug_str = meta->schema()->ToString(); StripWhiteSpace(&debug_str); ASSERT_STR_CONTAINS(stdout, debug_str); @@ -2583,7 +2583,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) { SCOPED_TRACE(stdout); debug_str = meta->partition_schema() - .PartitionDebugString(meta->partition(), meta->schema()); + .PartitionDebugString(meta->partition(), *meta->schema()); StripWhiteSpace(&debug_str); ASSERT_STR_CONTAINS(stdout, debug_str); debug_str = Substitute("Table name: $0 Table id: $1", @@ -2592,7 +2592,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) { debug_str = Substitute("Schema (version=$0):", meta->schema_version()); StripWhiteSpace(&debug_str); ASSERT_STR_CONTAINS(stdout, debug_str); - debug_str = meta->schema().ToString(); + debug_str = meta->schema()->ToString(); StripWhiteSpace(&debug_str); ASSERT_STR_CONTAINS(stdout, debug_str); diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc index 12077da..ea530f0 100644 --- a/src/kudu/tools/tool_action_fs.cc +++ b/src/kudu/tools/tool_action_fs.cc @@ -549,7 +549,7 @@ string TabletInfo(Field field, const TabletMetadata& tablet) { case Field::kTabletId: return tablet.tablet_id(); case Field::kPartition: return tablet.partition_schema() .PartitionDebugString(tablet.partition(), - tablet.schema()); + *tablet.schema().get()); default: LOG(FATAL) << "unhandled field (this is a bug): " << ToString(field); } } @@ -575,7 +575,7 @@ string BlockInfo(Field field, case Field::kBlockKind: return block_kind; case Field::kColumn: if (column_id) { - return tablet.schema().column_by_id(*column_id).name(); + return tablet.schema()->column_by_id(*column_id).name(); } else { return ""; } case Field::kColumnId: if (column_id) { @@ -597,8 +597,8 @@ string FormatCFileKeyMetadata(const TabletMetadata& tablet, Arena arena(1024); EncodedKey* key; - CHECK_OK(EncodedKey::DecodeEncodedString(tablet.schema(), &arena, value, &key)); - return key->Stringify(tablet.schema()); + CHECK_OK(EncodedKey::DecodeEncodedString(*tablet.schema().get(), &arena, value, &key)); + return key->Stringify(*tablet.schema().get()); } // Formats the delta stats property from CFile metadata. diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index 0540abb..03d39ad 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -554,6 +554,7 @@ Status SummarizeDataSize(const RunnerContext& context) { RETURN_NOT_OK_PREPEND(TabletMetadata::Load(fs.get(), tablet_id, &meta), Substitute("could not load tablet metadata for $0", tablet_id)); const string& table_id = meta->table_id(); + const SchemaPtr schema_ptr = meta->schema(); for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets()) { TabletSizeStats rowset_stats; RETURN_NOT_OK(SummarizeSize(fs.get(), rs_meta->redo_delta_blocks(), @@ -570,11 +571,11 @@ Status SummarizeDataSize(const RunnerContext& context) { for (const auto& e : column_blocks_by_id) { const auto& col_id = e.first; const auto& block = e.second; - const auto& col_idx = meta->schema().find_column_by_id(col_id); + const auto& col_idx = schema_ptr->find_column_by_id(col_id); string col_key = Substitute( "c$0 ($1)", col_id, (col_idx != Schema::kColumnNotFound) ? - meta->schema().column(col_idx).name() : "?"); + schema_ptr->column(col_idx).name() : "?"); RETURN_NOT_OK(SummarizeSize( fs.get(), { block }, col_key, &rowset_stats.column_bytes[col_key])); } @@ -661,12 +662,12 @@ Status DumpBlockIdsForLocalReplica(const RunnerContext& context) { cout << "Listing all data blocks in tablet " << tablet_id << ":" << endl; - Schema schema = meta->schema(); + SchemaPtr schema = meta->schema(); size_t idx = 0; for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets()) { cout << "Rowset " << idx++ << endl; - RETURN_NOT_OK(ListBlocksInRowSet(schema, *rs_meta)); + RETURN_NOT_OK(ListBlocksInRowSet(*schema.get(), *rs_meta)); } return Status::OK(); @@ -677,11 +678,11 @@ Status DumpTabletMeta(FsManager* fs_manager, scoped_refptr<TabletMetadata> meta; RETURN_NOT_OK(TabletMetadata::Load(fs_manager, tablet_id, &meta)); - const Schema& schema = meta->schema(); + const Schema& schema = *meta->schema().get(); cout << Indent(indent) << "Partition: " << meta->partition_schema().PartitionDebugString(meta->partition(), - meta->schema()) + schema) << endl; cout << Indent(indent) << "Table name: " << meta->table_name() << " Table id: " << meta->table_id() << endl; @@ -736,7 +737,7 @@ Status DumpRowSetInternal(const IOContext& ctx, if (FLAGS_dump_all_columns) { RETURN_NOT_OK(rs->DebugDump(&lines)); } else { - Schema key_proj = rs_meta->tablet_schema().CreateKeyProjection(); + Schema key_proj = rs_meta->tablet_schema()->CreateKeyProjection(); RowIteratorOptions opts; opts.projection = &key_proj; opts.io_context = &ctx; diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc index add5d9d..f07340f 100644 --- a/src/kudu/tserver/scanners.cc +++ b/src/kudu/tserver/scanners.cc @@ -404,15 +404,16 @@ ScanDescriptor Scanner::Descriptor() const { const auto& tablet_metadata = tablet_replica_->tablet_metadata(); descriptor.table_name = tablet_metadata->table_name(); + SchemaPtr schema_ptr = tablet_metadata->schema(); if (spec().lower_bound_key()) { descriptor.predicates.emplace_back( Substitute("PRIMARY KEY >= $0", KUDU_REDACT( - spec().lower_bound_key()->Stringify(tablet_metadata->schema())))); + spec().lower_bound_key()->Stringify(*schema_ptr)))); } if (spec().exclusive_upper_bound_key()) { descriptor.predicates.emplace_back( Substitute("PRIMARY KEY < $0", KUDU_REDACT( - spec().exclusive_upper_bound_key()->Stringify(tablet_metadata->schema())))); + spec().exclusive_upper_bound_key()->Stringify(*schema_ptr)))); } for (const auto& predicate : spec().predicates()) { diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 951ee14..ce7fb61 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -3478,7 +3478,7 @@ class InvalidScanRequest_WithIdsParamTest : public ::testing::WithParamInterface<ReadMode> { }; TEST_P(InvalidScanRequest_WithIdsParamTest, Test) { - const Schema* projection = tablet_replica_->tablet()->schema(); + const SchemaPtr projection = tablet_replica_->tablet()->schema(); ASSERT_TRUE(projection->has_column_ids()); VerifyScanRequestFailure(*projection, TabletServerErrorPB::INVALID_SCHEMA, diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index d32e661..908e638 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -1170,7 +1170,8 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req, return; } - const auto& tablet_schema = replica->tablet_metadata()->schema(); + const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema(); + const Schema& tablet_schema = *tablet_schema_ptr; if (req_schema == tablet_schema) { context->RespondSuccess(); return; @@ -2178,8 +2179,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req, // If the token doesn't have full scan privileges for the table, check // for required privileges based on the scan request. if (!privilege.scan_privilege()) { - const auto& schema = replica->tablet_metadata()->schema(); - if (!CheckScanPrivilegesOrRespond(scan_pb, schema, authorized_column_ids, + const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); + if (!CheckScanPrivilegesOrRespond(scan_pb, *schema_ptr, authorized_column_ids, "Scan", context)) { return; } @@ -2254,7 +2255,8 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, } if (req->need_schema_info()) { - const auto& tablet_schema = replica->tablet_metadata()->schema(); + const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); + const Schema& tablet_schema = *schema_ptr; CHECK_OK(SchemaToPB(tablet_schema, status->mutable_schema())); CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB( tablet_schema, status->mutable_partition_schema())); @@ -2300,7 +2302,8 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req, return; } if (!privilege.scan_privilege()) { - const auto& schema = replica->tablet_metadata()->schema(); + const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); + const Schema& schema = *schema_ptr; unordered_set<ColumnId> required_column_privileges; if (req->has_start_primary_key() || req->has_stop_primary_key()) { const auto& key_cols = schema.get_key_column_ids(); @@ -2345,7 +2348,8 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req, // Decode encoded key Arena arena(256); - const auto& tablet_schema = replica->tablet_metadata()->schema(); + const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema(); + const Schema& tablet_schema = *tablet_schema_ptr; EncodedKey* start = nullptr; EncodedKey* stop = nullptr; if (req->has_start_primary_key()) { @@ -2482,8 +2486,8 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, // If the token doesn't have full scan privileges for the table, check // for required privileges based on the checksum request. if (!privilege.scan_privilege()) { - const auto& schema = replica->tablet_metadata()->schema(); - if (!CheckScanPrivilegesOrRespond(new_req, schema, authorized_column_ids, + const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); + if (!CheckScanPrivilegesOrRespond(new_req, *schema_ptr, authorized_column_ids, "Checksum", context)) { return; } @@ -2788,7 +2792,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, } } - const auto& tablet_schema = replica->tablet_metadata()->schema(); + const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema(); + const Schema& tablet_schema = *tablet_schema_ptr; ScanSpec spec; s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec); diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc index 2137dfa..aa92940 100644 --- a/src/kudu/tserver/tserver_path_handlers.cc +++ b/src/kudu/tserver/tserver_path_handlers.cc @@ -35,6 +35,7 @@ #include "kudu/common/common.pb.h" #include "kudu/common/iterator_stats.h" #include "kudu/common/partition.h" +#include "kudu/common/schema.h" #include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/log_anchor_registry.h" @@ -93,8 +94,6 @@ DECLARE_int32(scan_history_count); namespace kudu { -class Schema; - namespace tserver { namespace { @@ -356,6 +355,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /* for (const scoped_refptr<TabletReplica>& replica : replicas) { EasyJson replica_json = details_json.PushBack(EasyJson::kObject); const auto& tmeta = replica->tablet_metadata(); + const SchemaPtr schema_ptr = tmeta->schema(); TabletStatusPB status; replica->GetTabletStatusPB(&status); replica_json["table_name"] = status.table_name(); @@ -366,7 +366,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /* } replica_json["partition"] = tmeta->partition_schema().PartitionDebugString(tmeta->partition(), - tmeta->schema()); + *schema_ptr); replica_json["state"] = replica->HumanReadableState(); if (status.has_estimated_on_disk_size()) { replica_json["n_bytes"] = @@ -425,9 +425,9 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req output->Set("table_name", table_name); const auto& tmeta = replica->tablet_metadata(); - const Schema& schema = tmeta->schema(); + const SchemaPtr schema_ptr = tmeta->schema(); output->Set("partition", - tmeta->partition_schema().PartitionDebugString(tmeta->partition(), schema)); + tmeta->partition_schema().PartitionDebugString(tmeta->partition(), *schema_ptr)); output->Set("on_disk_size", HumanReadableNumBytes::ToString(replica->OnDiskSize())); uint64_t live_row_count; Status s = replica->CountLiveRows(&live_row_count); @@ -437,7 +437,7 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req output->Set("tablet_live_row_count", "N/A"); } - SchemaToJson(schema, output); + SchemaToJson(*schema_ptr, output); } void TabletServerPathHandlers::HandleTabletSVGPage(const Webserver::WebRequest& req,
