Repository: kudu Updated Branches: refs/heads/master ce83b6cef -> 732ee211a
http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/log_block_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index 48fb910..4579b7f 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -1550,7 +1550,7 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts, // TODO(unknown): should we cap the number of outstanding containers and // force callers to block if we've reached it? LogBlockContainer* container; - RETURN_NOT_OK(GetOrCreateContainer(&container)); + RETURN_NOT_OK(GetOrCreateContainer(opts, &container)); // Generate a free block ID. // We have to loop here because earlier versions used non-sequential block IDs, @@ -1568,10 +1568,6 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts, return Status::OK(); } -Status LogBlockManager::CreateBlock(unique_ptr<WritableBlock>* block) { - return CreateBlock(CreateBlockOptions(), block); -} - Status LogBlockManager::OpenBlock(const BlockId& block_id, unique_ptr<ReadableBlock>* block) { scoped_refptr<LogBlock> lb; @@ -1670,9 +1666,10 @@ void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name) } } -Status LogBlockManager::GetOrCreateContainer(LogBlockContainer** container) { +Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts, + LogBlockContainer** container) { DataDir* dir; - RETURN_NOT_OK(dd_manager_.GetNextDataDir(&dir)); + RETURN_NOT_OK(dd_manager_.GetNextDataDir(opts, &dir)); { std::lock_guard<simple_spinlock> l(lock_); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/log_block_manager.h ---------------------------------------------------------------------- diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h index f46f6a8..eead3d2 100644 --- a/src/kudu/fs/log_block_manager.h +++ b/src/kudu/fs/log_block_manager.h @@ -104,10 +104,10 @@ struct LogBlockManagerMetrics; // made available in memory if _all_ on-disk operations (including any // necessary synchronization calls) are successful. // -// When a new block is created, a container is selected using a round-robin -// policy (i.e. the least recently used container). If no containers are -// available, a new one is created. Only when the block is fully written is -// the container returned to the pool of available containers. +// When a new block is created, a container is selected from the data +// directory group appropriate for the block, as indicated by hints in +// provided CreateBlockOptions (i.e. blocks for diskrowsets should be placed +// within its tablet's data directory group). // // All log block manager metadata requests are served from memory. When an // existing block manager is opened, all on-disk container metadata is @@ -175,8 +175,6 @@ class LogBlockManager : public BlockManager { Status CreateBlock(const CreateBlockOptions& opts, std::unique_ptr<WritableBlock>* block) override; - Status CreateBlock(std::unique_ptr<WritableBlock>* block) override; - Status OpenBlock(const BlockId& block_id, std::unique_ptr<ReadableBlock>* block) override; @@ -186,6 +184,8 @@ class LogBlockManager : public BlockManager { Status GetAllBlockIds(std::vector<BlockId>* block_ids) override; + DataDirManager* dd_manager() override { return &dd_manager_; }; + private: FRIEND_TEST(LogBlockManagerTest, TestLookupBlockLimit); FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation); @@ -241,13 +241,14 @@ class LogBlockManager : public BlockManager { // Must be called with 'lock_' held. void RemoveFullContainerUnlocked(const std::string& container_name); - // Returns the next container available for writing using a round-robin - // selection policy, creating a new one if necessary. + // Returns a container appropriate for the given CreateBlockOptions, creating + // a new container if necessary. // // After returning, the container is considered to be in use. When // writing is finished, call MakeContainerAvailable() to make it // available to other writers. - Status GetOrCreateContainer(internal::LogBlockContainer** container); + Status GetOrCreateContainer(const CreateBlockOptions& opts, + internal::LogBlockContainer** container); // Indicate that this container is no longer in use and can be handed out // to other writers. http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/delta_compaction-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_compaction-test.cc b/src/kudu/tablet/delta_compaction-test.cc index 053b672..c1b500e 100644 --- a/src/kudu/tablet/delta_compaction-test.cc +++ b/src/kudu/tablet/delta_compaction-test.cc @@ -69,7 +69,7 @@ class TestDeltaCompaction : public KuduTest { Status GetDeltaFileWriter(gscoped_ptr<DeltaFileWriter>* dfw, BlockId* block_id) const { unique_ptr<WritableBlock> block; - RETURN_NOT_OK(fs_manager_->CreateNewBlock(&block)); + RETURN_NOT_OK(fs_manager_->CreateNewBlock({}, &block)); *block_id = block->id(); dfw->reset(new DeltaFileWriter(std::move(block))); RETURN_NOT_OK((*dfw)->Start()); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/delta_compaction.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc index 5d1488d..af50e98 100644 --- a/src/kudu/tablet/delta_compaction.cc +++ b/src/kudu/tablet/delta_compaction.cc @@ -40,9 +40,7 @@ using std::shared_ptr; namespace kudu { -using cfile::CFileIterator; -using cfile::CFileReader; -using cfile::IndexTreeIterator; +using fs::CreateBlockOptions; using fs::WritableBlock; using std::unique_ptr; using std::vector; @@ -63,7 +61,8 @@ MajorDeltaCompaction::MajorDeltaCompaction( unique_ptr<DeltaIterator> delta_iter, vector<shared_ptr<DeltaStore> > included_stores, vector<ColumnId> col_ids, - HistoryGcOpts history_gc_opts) + HistoryGcOpts history_gc_opts, + string tablet_id) : fs_manager_(fs_manager), base_schema_(base_schema), column_ids_(std::move(col_ids)), @@ -71,6 +70,7 @@ MajorDeltaCompaction::MajorDeltaCompaction( base_data_(base_data), included_stores_(std::move(included_stores)), delta_iter_(std::move(delta_iter)), + tablet_id_(std::move(tablet_id)), redo_delta_mutations_written_(0), undo_delta_mutations_written_(0), state_(kInitialized) { @@ -241,7 +241,9 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas() { Status MajorDeltaCompaction::OpenBaseDataWriter() { CHECK(!base_data_writer_); - gscoped_ptr<MultiColumnWriter> w(new MultiColumnWriter(fs_manager_, &partial_schema_)); + gscoped_ptr<MultiColumnWriter> w(new MultiColumnWriter(fs_manager_, + &partial_schema_, + tablet_id_)); RETURN_NOT_OK(w->Open()); base_data_writer_.swap(w); return Status::OK(); @@ -249,7 +251,8 @@ Status MajorDeltaCompaction::OpenBaseDataWriter() { Status MajorDeltaCompaction::OpenRedoDeltaFileWriter() { unique_ptr<WritableBlock> block; - RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block), + CreateBlockOptions opts({ tablet_id_ }); + RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(opts, &block), "Unable to create REDO delta output block"); new_redo_delta_block_ = block->id(); new_redo_delta_writer_.reset(new DeltaFileWriter(std::move(block))); @@ -258,7 +261,8 @@ Status MajorDeltaCompaction::OpenRedoDeltaFileWriter() { Status MajorDeltaCompaction::OpenUndoDeltaFileWriter() { unique_ptr<WritableBlock> block; - RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block), + CreateBlockOptions opts({ tablet_id_ }); + RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(opts, &block), "Unable to create UNDO delta output block"); new_undo_delta_block_ = block->id(); new_undo_delta_writer_.reset(new DeltaFileWriter(std::move(block))); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/delta_compaction.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_compaction.h b/src/kudu/tablet/delta_compaction.h index 5f99f0c..6f0b788 100644 --- a/src/kudu/tablet/delta_compaction.h +++ b/src/kudu/tablet/delta_compaction.h @@ -54,7 +54,8 @@ class MajorDeltaCompaction { std::unique_ptr<DeltaIterator> delta_iter, std::vector<std::shared_ptr<DeltaStore> > included_stores, std::vector<ColumnId> col_ids, - HistoryGcOpts history_gc_opts); + HistoryGcOpts history_gc_opts, + std::string tablet_id); ~MajorDeltaCompaction(); // Executes the compaction. @@ -115,6 +116,9 @@ class MajorDeltaCompaction { // The merged view of the deltas from included_stores_. const std::unique_ptr<DeltaIterator> delta_iter_; + // The ID of the tablet being compacted. + const std::string tablet_id_; + // Outputs: gscoped_ptr<MultiColumnWriter> base_data_writer_; // The following two may not be initialized if we don't need to write a delta file. http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/delta_tracker.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc index eece75e..b293611 100644 --- a/src/kudu/tablet/delta_tracker.cc +++ b/src/kudu/tablet/delta_tracker.cc @@ -38,6 +38,7 @@ namespace kudu { namespace tablet { using cfile::ReaderOptions; +using fs::CreateBlockOptions; using fs::ReadableBlock; using fs::WritableBlock; using log::LogAnchorRegistry; @@ -347,7 +348,8 @@ Status DeltaTracker::CompactStores(int start_idx, int end_idx) { // Open a writer for the new destination delta block FsManager* fs = rowset_metadata_->fs_manager(); unique_ptr<WritableBlock> block; - RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&block), + CreateBlockOptions opts({ rowset_metadata_->tablet_metadata()->tablet_id() }); + RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(opts, &block), "Could not allocate delta block"); BlockId new_block_id(block->id()); @@ -613,7 +615,8 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms, // Open file for write. FsManager* fs = rowset_metadata_->fs_manager(); unique_ptr<WritableBlock> writable_block; - RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&writable_block), + CreateBlockOptions opts({ rowset_metadata_->tablet_metadata()->tablet_id() }); + RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(opts, &writable_block), "Unable to allocate new delta data writable_block"); BlockId block_id(writable_block->id()); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/deltafile-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc index e151c21..e353802 100644 --- a/src/kudu/tablet/deltafile-test.cc +++ b/src/kudu/tablet/deltafile-test.cc @@ -69,7 +69,7 @@ class TestDeltaFile : public KuduTest { void WriteTestFile(int min_timestamp = 0, int max_timestamp = 0) { unique_ptr<WritableBlock> block; - ASSERT_OK(fs_manager_->CreateNewBlock(&block)); + ASSERT_OK(fs_manager_->CreateNewBlock({}, &block)); test_block_ = block->id(); DeltaFileWriter dfw(std::move(block)); ASSERT_OK(dfw.Start()); @@ -219,7 +219,7 @@ TEST_F(TestDeltaFile, TestWriteDeltaFileIteratorToFile) { ASSERT_OK(s); unique_ptr<WritableBlock> block; - ASSERT_OK(fs_manager_->CreateNewBlock(&block)); + ASSERT_OK(fs_manager_->CreateNewBlock({}, &block)); BlockId block_id(block->id()); DeltaFileWriter dfw(std::move(block)); ASSERT_OK(dfw.Start()); @@ -366,7 +366,7 @@ TEST_F(TestDeltaFile, TestLazyInit) { // Finish() will return Status::Aborted(). TEST_F(TestDeltaFile, TestEmptyFileIsAborted) { unique_ptr<WritableBlock> block; - ASSERT_OK(fs_manager_->CreateNewBlock(&block)); + ASSERT_OK(fs_manager_->CreateNewBlock({}, &block)); test_block_ = block->id(); { DeltaFileWriter dfw(std::move(block)); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/deltamemstore-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc index e0c59f0..b61e1a1 100644 --- a/src/kudu/tablet/deltamemstore-test.cc +++ b/src/kudu/tablet/deltamemstore-test.cc @@ -168,7 +168,7 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) { // Flush the delta file so that the stats get updated. unique_ptr<WritableBlock> block; - ASSERT_OK(fs_manager_->CreateNewBlock(&block)); + ASSERT_OK(fs_manager_->CreateNewBlock({}, &block)); DeltaFileWriter dfw(std::move(block)); ASSERT_OK(dfw.Start()); gscoped_ptr<DeltaStats> stats; http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/diskrowset.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc index 4a33fde..0884a91 100644 --- a/src/kudu/tablet/diskrowset.cc +++ b/src/kudu/tablet/diskrowset.cc @@ -64,6 +64,7 @@ namespace kudu { namespace tablet { using cfile::BloomFileWriter; +using fs::CreateBlockOptions; using fs::ScopedWritableBlockCloser; using fs::WritableBlock; using log::LogAnchorRegistry; @@ -89,7 +90,8 @@ Status DiskRowSetWriter::Open() { TRACE_EVENT0("tablet", "DiskRowSetWriter::Open"); FsManager* fs = rowset_metadata_->fs_manager(); - col_writer_.reset(new MultiColumnWriter(fs, schema_)); + const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id(); + col_writer_.reset(new MultiColumnWriter(fs, schema_, tablet_id)); RETURN_NOT_OK(col_writer_->Open()); // Open bloom filter. @@ -107,7 +109,9 @@ Status DiskRowSetWriter::InitBloomFileWriter() { TRACE_EVENT0("tablet", "DiskRowSetWriter::InitBloomFileWriter"); unique_ptr<WritableBlock> block; FsManager* fs = rowset_metadata_->fs_manager(); - RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&block), + const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id(); + RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(CreateBlockOptions({ tablet_id }), + &block), "Couldn't allocate a block for bloom filter"); rowset_metadata_->set_bloom_block(block->id()); @@ -120,7 +124,9 @@ Status DiskRowSetWriter::InitAdHocIndexWriter() { TRACE_EVENT0("tablet", "DiskRowSetWriter::InitAdHocIndexWriter"); unique_ptr<WritableBlock> block; FsManager* fs = rowset_metadata_->fs_manager(); - RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&block), + const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id(); + RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(CreateBlockOptions({ tablet_id }), + &block), "Couldn't allocate a block for compoound index"); rowset_metadata_->set_adhoc_index_block(block->id()); @@ -317,8 +323,10 @@ Status RollingDiskRowSetWriter::RollWriter() { FsManager* fs = tablet_metadata_->fs_manager(); unique_ptr<WritableBlock> undo_data_block; unique_ptr<WritableBlock> redo_data_block; - RETURN_NOT_OK(fs->CreateNewBlock(&undo_data_block)); - RETURN_NOT_OK(fs->CreateNewBlock(&redo_data_block)); + RETURN_NOT_OK(fs->CreateNewBlock(CreateBlockOptions({ tablet_metadata_->tablet_id() }), + &undo_data_block)); + RETURN_NOT_OK(fs->CreateNewBlock(CreateBlockOptions({ tablet_metadata_->tablet_id() }), + &redo_data_block)); cur_undo_ds_block_id_ = undo_data_block->id(); cur_redo_ds_block_id_ = redo_data_block->id(); cur_undo_writer_.reset(new DeltaFileWriter(std::move(undo_data_block))); @@ -586,7 +594,8 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids, std::move(delta_iter), std::move(included_stores), col_ids, - std::move(history_gc_opts))); + std::move(history_gc_opts), + rowset_metadata_->tablet_metadata()->tablet_id())); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/metadata.proto ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto index 2e5bb01..83bffbc 100644 --- a/src/kudu/tablet/metadata.proto +++ b/src/kudu/tablet/metadata.proto @@ -123,6 +123,12 @@ message TabletSuperBlockPB { // WAL before tombstoning. // Only relevant for TOMBSTONED tablets. optional consensus.OpId tombstone_last_logged_opid = 12; + + // Tablet data is spread across a data directory group. If this is not set + // and the tablet state is TABLET_DATA_READY, it is assumed that the data is + // from a version of Kudu before 1.5.0. In this case, a new group will be + // created spanning all data directories. + optional DataDirGroupPB data_dir_group = 15; } // The enum of tablet states. http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/multi_column_writer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/multi_column_writer.cc b/src/kudu/tablet/multi_column_writer.cc index 7b19df8..3500283 100644 --- a/src/kudu/tablet/multi_column_writer.cc +++ b/src/kudu/tablet/multi_column_writer.cc @@ -27,15 +27,18 @@ namespace kudu { namespace tablet { using cfile::CFileWriter; +using fs::CreateBlockOptions; using fs::ScopedWritableBlockCloser; using fs::WritableBlock; using std::unique_ptr; MultiColumnWriter::MultiColumnWriter(FsManager* fs, - const Schema* schema) + const Schema* schema, + const string& tablet_id) : fs_(fs), schema_(schema), - finished_(false) { + finished_(false), + tablet_id_(tablet_id) { } MultiColumnWriter::~MultiColumnWriter() { @@ -46,6 +49,7 @@ Status MultiColumnWriter::Open() { CHECK(cfile_writers_.empty()); // Open columns. + const CreateBlockOptions block_opts({ tablet_id_ }); for (int i = 0; i < schema_->num_columns(); i++) { const ColumnSchema &col = schema_->column(i); @@ -70,7 +74,7 @@ Status MultiColumnWriter::Open() { // Open file for write. unique_ptr<WritableBlock> block; - RETURN_NOT_OK_PREPEND(fs_->CreateNewBlock(&block), + RETURN_NOT_OK_PREPEND(fs_->CreateNewBlock(block_opts, &block), "Unable to open output file for column " + col.ToString()); BlockId block_id(block->id()); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/multi_column_writer.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/multi_column_writer.h b/src/kudu/tablet/multi_column_writer.h index 71a0691..7b527eb 100644 --- a/src/kudu/tablet/multi_column_writer.h +++ b/src/kudu/tablet/multi_column_writer.h @@ -17,10 +17,12 @@ #ifndef KUDU_TABLET_MULTI_COLUMN_WRITER_H #define KUDU_TABLET_MULTI_COLUMN_WRITER_H -#include <glog/logging.h> #include <map> +#include <string> #include <vector> +#include <glog/logging.h> + #include "kudu/common/schema.h" #include "kudu/fs/fs_manager.h" #include "kudu/gutil/macros.h" @@ -41,11 +43,12 @@ class ScopedWritableBlockCloser; namespace tablet { // Wrapper which writes several columns in parallel corresponding to some -// Schema. +// Schema. Written blocks will fall in the tablet_id's data dir group. class MultiColumnWriter { public: MultiColumnWriter(FsManager* fs, - const Schema* schema); + const Schema* schema, + const std::string& tablet_id); virtual ~MultiColumnWriter(); @@ -85,6 +88,8 @@ class MultiColumnWriter { bool finished_; + const std::string tablet_id_; + std::vector<cfile::CFileWriter *> cfile_writers_; std::vector<BlockId> block_ids_; http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/tablet_bootstrap-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc index 49c3c04..0a0d5e9 100644 --- a/src/kudu/tablet/tablet_bootstrap-test.cc +++ b/src/kudu/tablet/tablet_bootstrap-test.cc @@ -27,6 +27,7 @@ #include "kudu/consensus/log_util.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/opid_util.h" +#include "kudu/fs/data_dirs.h" #include "kudu/server/logical_clock.h" #include "kudu/util/logging_test_util.h" #include "kudu/tablet/tablet_bootstrap.h" @@ -213,6 +214,7 @@ TEST_F(BootstrapTest, TestIncompleteTabletCopy) { ASSERT_OK(PersistTestTabletMetadataState(TABLET_DATA_COPYING)); shared_ptr<Tablet> tablet; ConsensusBootstrapInfo boot_info; + fs_manager_->dd_manager()->DeleteDataDirGroup(log::kTestTablet); Status s = BootstrapTestTablet(-1, -1, &tablet, &boot_info); ASSERT_TRUE(s.IsCorruption()) << "Expected corruption: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "TabletMetadata bootstrap state is TABLET_DATA_COPYING"); @@ -261,6 +263,10 @@ TEST_F(BootstrapTest, TestOrphanCommit) { log::SegmentSequence segments; ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); fs_manager_->env()->DeleteFile(segments[0]->path()); + + // Untrack the tablet in the data dir manager so upon the next call to + // BootstrapTestTablet, the tablet metadata's data dir group can be loaded. + fs_manager_->dd_manager()->DeleteDataDirGroup(tablet->tablet_id()); } { shared_ptr<Tablet> tablet; http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tablet/tablet_metadata.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index 24cd33d..f7a951d 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -27,6 +27,8 @@ #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/opid_util.h" +#include "kudu/fs/block_manager.h" +#include "kudu/fs/data_dirs.h" #include "kudu/gutil/atomicops.h" #include "kudu/gutil/bind.h" #include "kudu/gutil/dynamic_annotations.h" @@ -38,6 +40,7 @@ #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/pb_util.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/trace.h" @@ -54,11 +57,11 @@ using std::shared_ptr; using base::subtle::Barrier_AtomicIncrement; using strings::Substitute; -using kudu::consensus::MinimumOpId; -using kudu::consensus::OpId; -using kudu::consensus::RaftConfigPB; - namespace kudu { + +using consensus::MinimumOpId; +using consensus::OpId; + namespace tablet { const int64 kNoDurableMemStore = -1; @@ -82,6 +85,11 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager, return Status::AlreadyPresent("Tablet already exists", tablet_id); } + RETURN_NOT_OK_PREPEND(fs_manager->dd_manager()->CreateDataDirGroup(tablet_id), + "Failed to create TabletMetadata"); + auto dir_group_cleanup = MakeScopedCleanup([&]() { + fs_manager->dd_manager()->DeleteDataDirGroup(tablet_id); + }); scoped_refptr<TabletMetadata> ret(new TabletMetadata(fs_manager, tablet_id, table_name, @@ -91,6 +99,8 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager, partition, initial_tablet_data_state)); RETURN_NOT_OK(ret->Flush()); + dir_group_cleanup.cancel(); + metadata->swap(ret); return Status::OK(); } @@ -189,9 +199,22 @@ Status TabletMetadata::DeleteTabletData(TabletDataState delete_type, } } + // Keep a copy of the old data dir group in case of flush failure. + DataDirGroupPB pb; + bool old_group_exists = fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_, &pb); + + // Remove the tablet's data dir group tracked by the DataDirManager. + fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_); + auto revert_group_cleanup = MakeScopedCleanup([&]() { + if (old_group_exists) { + fs_manager_->dd_manager()->LoadDataDirGroupFromPB(tablet_id_, pb); + } + }); + // Flushing will sync the new tablet_data_state_ to disk and will now also // delete all the data. RETURN_NOT_OK(Flush()); + revert_group_cleanup.cancel(); // Re-sync to disk one more time. // This call will typically re-sync with an empty orphaned blocks list @@ -357,6 +380,21 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) } AddOrphanedBlocksUnlocked(orphaned_blocks); + if (superblock.has_data_dir_group()) { + RETURN_NOT_OK_PREPEND(fs_manager_->dd_manager()->LoadDataDirGroupFromPB( + tablet_id_, superblock.data_dir_group()), "Failed to load DataDirGroup from superblock"); + } else if (tablet_data_state_ == TABLET_DATA_READY) { + // If the superblock does not contain a DataDirGroup, this server has + // likely been upgraded from before 1.5.0. Create a new DataDirGroup for + // the tablet. If the data is not TABLET_DATA_READY, group creation is + // pointless, as the tablet metadata will be deleted anyway. + // + // Since we don't know what directories the existing blocks are in, we + // should assume the data is spread across all disks. + RETURN_NOT_OK(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_, + fs::DataDirManager::DirDistributionMode::ACROSS_ALL_DIRS)); + } + if (superblock.has_tombstone_last_logged_opid()) { tombstone_last_logged_opid_ = superblock.tombstone_last_logged_opid(); } else { @@ -519,6 +557,7 @@ Status TabletMetadata::ReplaceSuperBlock(const TabletSuperBlockPB &pb) { { MutexLock l(flush_lock_); RETURN_NOT_OK_PREPEND(ReplaceSuperBlockUnlocked(pb), "Unable to replace superblock"); + fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_); } RETURN_NOT_OK_PREPEND(LoadFromSuperBlock(pb), @@ -584,6 +623,13 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, block_id.CopyToPB(pb.mutable_orphaned_blocks()->Add()); } + // Serialize the tablet's DataDirGroupPB if one exists. One may not exist if + // this is called during a tablet deletion. + DataDirGroupPB group_pb; + if (fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_, &group_pb)) { + pb.mutable_data_dir_group()->Swap(&group_pb); + } + super_block->Swap(&pb); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tools/kudu-tool-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index b5a73a8..b7d4190 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -690,7 +690,7 @@ TEST_F(ToolTest, TestFsDumpCFile) { ASSERT_OK(fs.Open()); unique_ptr<WritableBlock> block; - ASSERT_OK(fs.CreateNewBlock(&block)); + ASSERT_OK(fs.CreateNewBlock({}, &block)); BlockId block_id = block->id(); StringDataGenerator<false> generator("hello %04d"); WriterOptions opts; http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tserver/tablet_copy_client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc index b14de82..2497644 100644 --- a/src/kudu/tserver/tablet_copy_client-test.cc +++ b/src/kudu/tserver/tablet_copy_client-test.cc @@ -21,6 +21,7 @@ #include <glog/stl_logging.h> #include "kudu/consensus/quorum_util.h" +#include "kudu/fs/block_manager.h" #include "kudu/gutil/strings/fastmem.h" #include "kudu/tablet/tablet_bootstrap.h" #include "kudu/tserver/tablet_copy_client.h" @@ -231,7 +232,7 @@ INSTANTIATE_TEST_CASE_P(BlockDeleteTriggers, void TabletCopyClientAbortTest::CreateTestBlocks(int num_blocks) { for (int i = 0; i < num_blocks; i++) { unique_ptr<fs::WritableBlock> block; - ASSERT_OK(fs_manager_->CreateNewBlock(&block)); + ASSERT_OK(fs_manager_->CreateNewBlock({}, &block)); block->Append("Test"); ASSERT_OK(block->Close()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tserver/tablet_copy_client.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc index 5968de6..239c224 100644 --- a/src/kudu/tserver/tablet_copy_client.cc +++ b/src/kudu/tserver/tablet_copy_client.cc @@ -26,6 +26,7 @@ #include "kudu/consensus/metadata.pb.h" #include "kudu/fs/block_id.h" #include "kudu/fs/block_manager.h" +#include "kudu/fs/data_dirs.h" #include "kudu/fs/fs_manager.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/strings/util.h" @@ -74,11 +75,8 @@ namespace kudu { namespace tserver { using consensus::ConsensusMetadata; -using consensus::ConsensusStatePB; -using consensus::OpId; -using consensus::RaftConfigPB; -using consensus::RaftPeerPB; using env_util::CopyFile; +using fs::CreateBlockOptions; using fs::WritableBlock; using rpc::Messenger; using std::shared_ptr; @@ -208,6 +206,10 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, superblock_->clear_rowsets(); superblock_->clear_orphaned_blocks(); + // The UUIDs within the DataDirGroupPB on the remote are also unique to the + // remote and have no meaning to us. + superblock_->clear_data_dir_group(); + // Set the data state to COPYING to indicate that, on crash, this replica // should be discarded. superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING); @@ -242,6 +244,7 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, RETURN_NOT_OK_PREPEND( TSTabletManager::DeleteTabletData(meta_, tablet::TABLET_DATA_COPYING, boost::none), "Could not replace superblock with COPYING data state"); + CHECK_OK(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_)); } else { Partition partition; Partition::FromPB(superblock_->partition(), &partition); @@ -259,6 +262,8 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr, tablet::TABLET_DATA_COPYING, &meta_)); } + CHECK(fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_, + superblock_->mutable_data_dir_group())); state_ = kStarted; if (meta) { @@ -544,7 +549,7 @@ Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id, VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString(); unique_ptr<WritableBlock> block; - RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block), + RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(CreateBlockOptions({ tablet_id_ }), &block), "Unable to create new block"); DataIdPB data_id; http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tserver/tablet_copy_client.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h index 33ffa20..9a61985 100644 --- a/src/kudu/tserver/tablet_copy_client.h +++ b/src/kudu/tserver/tablet_copy_client.h @@ -92,6 +92,9 @@ class TabletCopyClient { // in progress. If the 'metadata' pointer is passed as NULL, it is ignored, // otherwise the TabletMetadata object resulting from the initial remote // bootstrap response is returned. + // + // Upon success, tablet metadata will be created and the tablet will be + // assigned to a data directory group. Status Start(const HostPort& copy_source_addr, scoped_refptr<tablet::TabletMetadata>* meta); http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/tserver/ts_tablet_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index e4a676c..1553fca 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -506,6 +506,10 @@ void TSTabletManager::RunTabletCopy( // Tombstone the tablet and store the last-logged OpId. old_replica->Shutdown(); + // Note that this leaves the data dir manager without any references to + // tablet_id. This is okay because the tablet_copy_client should + // generate a new disk group during the call to Start(). + // // TODO(mpercy): Because we begin shutdown of the tablet after we check our // last-logged term against the leader's term, there may be operations // in flight and it may be possible for the same check in the tablet @@ -1055,21 +1059,21 @@ Status TSTabletManager::HandleNonReadyTabletOnStartup(const scoped_refptr<Tablet } Status TSTabletManager::DeleteTabletData(const scoped_refptr<TabletMetadata>& meta, - TabletDataState data_state, + TabletDataState delete_type, const boost::optional<OpId>& last_logged_opid) { const string& tablet_id = meta->tablet_id(); LOG(INFO) << LogPrefix(tablet_id, meta->fs_manager()) << "Deleting tablet data with delete state " - << TabletDataState_Name(data_state); - CHECK(data_state == TABLET_DATA_DELETED || - data_state == TABLET_DATA_TOMBSTONED || - data_state == TABLET_DATA_COPYING) - << "Unexpected data_state to delete tablet " << meta->tablet_id() << ": " - << TabletDataState_Name(data_state) << " (" << data_state << ")"; + << TabletDataState_Name(delete_type); + CHECK(delete_type == TABLET_DATA_DELETED || + delete_type == TABLET_DATA_TOMBSTONED || + delete_type == TABLET_DATA_COPYING) + << "Unexpected delete_type to delete tablet " << meta->tablet_id() << ": " + << TabletDataState_Name(delete_type) << " (" << delete_type << ")"; // Note: Passing an unset 'last_logged_opid' will retain the last_logged_opid // that was previously in the metadata. - RETURN_NOT_OK(meta->DeleteTabletData(data_state, last_logged_opid)); + RETURN_NOT_OK(meta->DeleteTabletData(delete_type, last_logged_opid)); LOG(INFO) << LogPrefix(tablet_id, meta->fs_manager()) << "Tablet deleted. Last logged OpId: " << meta->tombstone_last_logged_opid(); @@ -1080,13 +1084,13 @@ Status TSTabletManager::DeleteTabletData(const scoped_refptr<TabletMetadata>& me // We do not delete the superblock or the consensus metadata when tombstoning // a tablet or marking it as entering the tablet copy process. - if (data_state == TABLET_DATA_COPYING || - data_state == TABLET_DATA_TOMBSTONED) { + if (delete_type == TABLET_DATA_COPYING || + delete_type == TABLET_DATA_TOMBSTONED) { return Status::OK(); } // Only TABLET_DATA_DELETED tablets get this far. - DCHECK_EQ(TABLET_DATA_DELETED, data_state); + DCHECK_EQ(TABLET_DATA_DELETED, delete_type); RETURN_NOT_OK(ConsensusMetadata::DeleteOnDiskData(meta->fs_manager(), meta->tablet_id())); MAYBE_FAULT(FLAGS_fault_crash_after_cmeta_deleted); return meta->DeleteSuperBlock();
