This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 21073b06ce30171b10ed09bee133fcafe650f022 Author: Adar Dembo <[email protected]> AuthorDate: Mon Jan 20 21:18:08 2020 -0800 file cache: support alternate open modes Previously, the file cache only supported opening existing files; any new file creation happened out-of-band and then the file reopened via the cache. If we're going to use the file cache for log index chunks, however, we need to support CREATE_OR_OPEN style usage, and doing it in the log index itself is somewhat hairy. This patch modifies the file cache to support most of the modes defined in Env::OpenMode. I tried to ensure that cache operations look and feel like a standard POSIX filesystem, but it's tough to get this right, and I'm sure I missed some corner cases. I feel pretty good about our use cases (block managers, log segments, and log index chunks) though. Change-Id: Ie167302ef85b8e1a40fbb89a7742e2cbb43bcec3 Reviewed-on: http://gerrit.cloudera.org:8080/15081 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/fs/file_block_manager.cc | 3 +- src/kudu/fs/log_block_manager.cc | 73 ++++++++-------- src/kudu/util/file_cache-stress-test.cc | 4 +- src/kudu/util/file_cache-test.cc | 90 ++++++++++++++++---- src/kudu/util/file_cache.cc | 142 ++++++++++++++++++++++++-------- src/kudu/util/file_cache.h | 49 ++++++++--- 6 files changed, 259 insertions(+), 102 deletions(-) diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc index 0c5796c..09b2a99 100644 --- a/src/kudu/fs/file_block_manager.cc +++ b/src/kudu/fs/file_block_manager.cc @@ -829,7 +829,8 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id, shared_ptr<RandomAccessFile> reader; if (PREDICT_TRUE(file_cache_)) { - RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenExistingFile(path, &reader)); + RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenFile<Env::MUST_EXIST>( + path, &reader)); } else { unique_ptr<RandomAccessFile> r; RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(path, &r)); diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index b09f319..cb40778 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -764,42 +764,47 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager, // // When looping, we delete any created-and-orphaned files. do { - unique_ptr<RWFile> rwf; - - if (metadata_writer) { - block_manager->env()->DeleteFile(metadata_path); - } common_path = JoinPathSegments(dir->dir(), block_manager->oid_generator()->Next()); metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix); - RWFileOptions wr_opts; - wr_opts.mode = Env::MUST_CREATE; - metadata_status = block_manager->env()->NewRWFile(wr_opts, - metadata_path, - &rwf); - metadata_writer.reset(rwf.release()); - - if (data_file) { - block_manager->env()->DeleteFile(data_path); - } data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix); - RWFileOptions rw_opts; - rw_opts.mode = Env::MUST_CREATE; - data_status = block_manager->env()->NewRWFile(rw_opts, - data_path, - &rwf); - data_file.reset(rwf.release()); + + if (PREDICT_TRUE(block_manager->file_cache_)) { + if (metadata_writer) { + WARN_NOT_OK(block_manager->file_cache_->DeleteFile(metadata_path), + "could not delete orphaned metadata file thru file cache"); + } + if (data_file) { + WARN_NOT_OK(block_manager->file_cache_->DeleteFile(data_path), + "could not delete orphaned data file thru file cache"); + } + metadata_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>( + metadata_path, &metadata_writer); + data_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>( + data_path, &data_file); + } else { + if (metadata_writer) { + WARN_NOT_OK(block_manager->env()->DeleteFile(metadata_path), + "could not delete orphaned metadata file"); + } + if (data_file) { + WARN_NOT_OK(block_manager->env()->DeleteFile(data_path), + "could not delete orphaned data file"); + } + unique_ptr<RWFile> rwf; + RWFileOptions rw_opts; + + rw_opts.mode = Env::MUST_CREATE; + metadata_status = block_manager->env()->NewRWFile( + rw_opts, metadata_path, &rwf); + metadata_writer.reset(rwf.release()); + data_status = block_manager->env()->NewRWFile( + rw_opts, data_path, &rwf); + data_file.reset(rwf.release()); + } } while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() || data_status.IsAlreadyPresent())); if (metadata_status.ok() && data_status.ok()) { - if (PREDICT_TRUE(block_manager->file_cache_)) { - metadata_writer.reset(); - RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile( - metadata_path, &metadata_writer)); - data_file.reset(); - RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile( - data_path, &data_file)); - } unique_ptr<WritablePBContainerFile> metadata_file(new WritablePBContainerFile( std::move(metadata_writer))); RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB())); @@ -834,10 +839,10 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager, shared_ptr<RWFile> metadata_file; shared_ptr<RWFile> data_file; if (PREDICT_TRUE(block_manager->file_cache_)) { - RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile( - metadata_path, &metadata_file)); - RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile( - data_path, &data_file)); + RETURN_NOT_OK_CONTAINER_DISK_FAILURE( + block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(metadata_path, &metadata_file)); + RETURN_NOT_OK_CONTAINER_DISK_FAILURE( + block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(data_path, &data_file)); } else { RWFileOptions opts; opts.mode = Env::MUST_EXIST; @@ -1224,7 +1229,7 @@ Status LogBlockContainer::SyncMetadata() { Status LogBlockContainer::ReopenMetadataWriter() { shared_ptr<RWFile> f; if (PREDICT_TRUE(block_manager_->file_cache_)) { - RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenExistingFile( + RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenFile<Env::MUST_EXIST>( metadata_file_->filename(), &f)); } else { unique_ptr<RWFile> f_uniq; diff --git a/src/kudu/util/file_cache-stress-test.cc b/src/kudu/util/file_cache-stress-test.cc index 8fa1b74..bd3fd22 100644 --- a/src/kudu/util/file_cache-stress-test.cc +++ b/src/kudu/util/file_cache-stress-test.cc @@ -172,13 +172,13 @@ class FileCacheStressTest : public KuduTest { } if (HasPrefixString(BaseName(to_open), kRWFPrefix)) { shared_ptr<RWFile> rwf; - TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &rwf)); + TEST_CHECK_OK(cache_->OpenFile<Env::MUST_EXIST>(to_open, &rwf)); rwfs.emplace_back(std::move(rwf)); } else { CHECK(HasPrefixString(BaseName(to_open), kRAFPrefix)); shared_ptr<RandomAccessFile> raf; - TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &raf)); + TEST_CHECK_OK(cache_->OpenFile<Env::MUST_EXIST>(to_open, &raf)); rafs.emplace_back(std::move(raf)); } FinishedOpen(to_open); diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc index 1fbfc38..01ecfd5 100644 --- a/src/kudu/util/file_cache-test.cc +++ b/src/kudu/util/file_cache-test.cc @@ -121,7 +121,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) { // Open a non-existent file. { shared_ptr<TypeParam> f; - ASSERT_TRUE(this->cache_->OpenExistingFile( + ASSERT_TRUE(this->cache_->template OpenFile<Env::MUST_EXIST>( "/does/not/exist", &f).IsNotFound()); NO_FATALS(this->AssertFdsAndDescriptors(0, 0)); } @@ -139,7 +139,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) { { // Open a test file. It should open an fd and create a descriptor. shared_ptr<TypeParam> f1; - ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f1)); NO_FATALS(this->AssertFdsAndDescriptors(1, 1)); // Spot check the test data by comparing sizes. @@ -153,7 +153,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) { // Open the same file a second time. It should reuse the existing // descriptor and not open a second fd. shared_ptr<TypeParam> f2; - ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f2)); NO_FATALS(this->AssertFdsAndDescriptors(1, 1)); { auto uh(this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE)); @@ -163,7 +163,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) { // Open a second file. This will create a new descriptor, but evict the fd // opened for the first file, so the fd count should remain constant. shared_ptr<TypeParam> f3; - ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f3)); NO_FATALS(this->AssertFdsAndDescriptors(1, 2)); { auto uh(this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE)); @@ -207,12 +207,12 @@ TYPED_TEST(FileCacheTest, TestDeletion) { ASSERT_TRUE(this->env_->FileExists(kFile2)); { shared_ptr<TypeParam> f1; - ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f1)); ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds()); ASSERT_OK(this->cache_->DeleteFile(kFile2)); { shared_ptr<TypeParam> f2; - ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound()); + ASSERT_TRUE(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f2).IsNotFound()); } ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound()); ASSERT_TRUE(this->env_->FileExists(kFile2)); @@ -229,7 +229,7 @@ TYPED_TEST(FileCacheTest, TestDeletion) { ASSERT_OK(this->WriteTestFile(kFile3, kData3)); { shared_ptr<TypeParam> f3; - ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile3, &f3)); } ASSERT_TRUE(this->env_->FileExists(kFile3)); ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds()); @@ -245,7 +245,7 @@ TYPED_TEST(FileCacheTest, TestInvalidation) { // Open the file. shared_ptr<TypeParam> f; - ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f)); // Write a new file and rename it in place on top of file1. const string kFile2 = this->GetTestPath("foo2"); @@ -266,7 +266,7 @@ TYPED_TEST(FileCacheTest, TestInvalidation) { // But if we re-open the path again, the new descriptor should read the // new data. shared_ptr<TypeParam> f2; - ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f2)); ASSERT_OK(f2->Size(&size)); ASSERT_EQ(kData2.size(), size); } @@ -291,7 +291,7 @@ TYPED_TEST(FileCacheTest, TestHeavyReads) { string filename = this->GetTestPath(Substitute("$0", i)); ASSERT_OK(this->WriteTestFile(filename, data)); shared_ptr<TypeParam> f; - ASSERT_OK(this->cache_->OpenExistingFile(filename, &f)); + ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(filename, &f)); opened_files.push_back(f); } @@ -328,7 +328,7 @@ TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) { threads.emplace_back([&]() { for (int i = 0; i < 10000; i++) { shared_ptr<TypeParam> f; - CHECK_OK(this->cache_->OpenExistingFile(kFile, &f)); + CHECK_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile, &f)); } }); } @@ -346,13 +346,67 @@ TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) { ASSERT_OK(this->WriteTestFile(kFile, "test data")); shared_ptr<RandomAccessFile> f; - ASSERT_OK(cache_->OpenExistingFile(kFile, &f)); + ASSERT_OK(cache_->OpenFile<Env::MUST_EXIST>(kFile, &f)); // This used to crash due to a kudu_malloc_usable_size() call on a memory // address that wasn't the start of an actual heap allocation. LOG(INFO) << f->memory_footprint(); } +class RWFileCacheTest : public FileCacheTest<RWFile> { +}; + +TEST_F(RWFileCacheTest, TestOpenMustCreate) { + const string kFile1 = this->GetTestPath("foo"); + const string kFile2 = this->GetTestPath("bar"); + + { + shared_ptr<RWFile> rwf1; + ASSERT_OK(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf1)); + NO_FATALS(AssertFdsAndDescriptors(1, 1)); + + // If there's already a descriptor, a second open will fail in the file cache. + shared_ptr<RWFile> rwf2; + ASSERT_TRUE(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf2).IsAlreadyPresent()); + + // Now let's evict kFile1. + shared_ptr<RWFile> rwf3; + ASSERT_OK(cache_->OpenFile<Env::MUST_CREATE>(kFile2, &rwf3)); + NO_FATALS(AssertFdsAndDescriptors(1, 2)); + + // The reopen of kFile1 shouldn't be with MUST_CREATE; otherwise this would fail. + ASSERT_OK(rwf1->Sync()); + } + { + // Without any existing descriptors, open will fail in the filesystem. + NO_FATALS(AssertFdsAndDescriptors(0, 0)); + shared_ptr<RWFile> rwf; + ASSERT_TRUE(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf).IsAlreadyPresent()); + } +} + +TEST_F(RWFileCacheTest, TestOpenCreateOrOpen) { + const string kFile1 = this->GetTestPath("foo"); + const string kFile2 = this->GetTestPath("bar"); + + shared_ptr<RWFile> rwf1; + ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile1, &rwf1)); + + // If there's already a descriptor, a second open will also succeed. + shared_ptr<RWFile> rwf2; + ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile1, &rwf2)); + + // Now let's evict kFile1. + shared_ptr<RWFile> rwf3; + ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile2, &rwf3)); + NO_FATALS(AssertFdsAndDescriptors(1, 2)); + + // The reopen of kFile1 should use MUST_EXIST. If we delete the file out + // from under the cache, we can see this in action as the reopen fails. + ASSERT_OK(env_->DeleteFile(kFile1)); + ASSERT_TRUE(rwf1->Sync().IsNotFound()); +} + class MixedFileCacheTest : public KuduTest { }; @@ -376,9 +430,9 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) { // Open the test files, each as a different file type. shared_ptr<RWFile> rwf; - ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf)); + ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile1, &rwf)); shared_ptr<RandomAccessFile> raf; - ASSERT_OK(cache.OpenExistingFile(kFile2, &raf)); + ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile2, &raf)); // Verify the correct file contents for each test file. uint64_t size; @@ -397,13 +451,13 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) { // // These checks are expensive so they're only done in DEBUG mode. shared_ptr<RWFile> rwf2; - ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf2)); + ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile1, &rwf2)); shared_ptr<RandomAccessFile> raf2; - ASSERT_OK(cache.OpenExistingFile(kFile2, &raf2)); + ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile2, &raf2)); #ifndef NDEBUG - ASSERT_DEATH({ cache.OpenExistingFile(kFile1, &raf); }, + ASSERT_DEATH({ cache.OpenFile<Env::MUST_EXIST>(kFile1, &raf); }, "!FindDescriptorUnlocked"); - ASSERT_DEATH({ cache.OpenExistingFile(kFile2, &rwf); }, + ASSERT_DEATH({ cache.OpenFile<Env::MUST_EXIST>(kFile2, &rwf); }, "!FindDescriptorUnlocked"); #endif } diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc index 8fb0adf..0bbbfe7 100644 --- a/src/kudu/util/file_cache.cc +++ b/src/kudu/util/file_cache.cc @@ -240,55 +240,55 @@ class Descriptor<RWFile> : public RWFile { Status Read(uint64_t offset, Slice result) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Read(offset, result); } Status ReadV(uint64_t offset, ArrayView<Slice> results) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->ReadV(offset, results); } Status Write(uint64_t offset, const Slice& data) override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Write(offset, data); } Status WriteV(uint64_t offset, ArrayView<const Slice> data) override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->WriteV(offset, data); } Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->PreAllocate(offset, length, mode); } Status Truncate(uint64_t length) override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Truncate(length); } Status PunchHole(uint64_t offset, size_t length) override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->PunchHole(offset, length); } Status Flush(FlushMode mode, uint64_t offset, size_t length) override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Flush(mode, offset, length); } Status Sync() override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Sync(); } @@ -299,13 +299,13 @@ class Descriptor<RWFile> : public RWFile { Status Size(uint64_t* size) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->Size(size); } Status GetExtentMap(ExtentMap* out) const override { ScopedOpenedDescriptor<RWFile> opened(&base_); - RETURN_NOT_OK(ReopenFileIfNecessary(&opened)); + RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened)); return opened.file()->GetExtentMap(out); } @@ -316,14 +316,17 @@ class Descriptor<RWFile> : public RWFile { private: friend class ::kudu::FileCache; + template <Env::OpenMode Mode> Status Init() { - return once_.Init(&Descriptor<RWFile>::InitOnce, this); + return once_.Init(&Descriptor<RWFile>::InitOnce<Mode>, this); } + template <Env::OpenMode Mode> Status InitOnce() { - return ReopenFileIfNecessary(nullptr); + return ReopenFileIfNecessary<Mode>(nullptr); } + template <Env::OpenMode Mode> Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const { ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache()); CHECK(!base_.invalidated()); @@ -337,7 +340,7 @@ class Descriptor<RWFile> : public RWFile { // The file was evicted, reopen it. RWFileOptions opts; - opts.mode = Env::MUST_EXIST; + opts.mode = Mode; unique_ptr<RWFile> f; RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f)); @@ -488,50 +491,112 @@ Status FileCache::Init() { } template <> -Status FileCache::OpenExistingFile(const string& file_name, - shared_ptr<RWFile>* file) { +Status FileCache::DoOpenFile(const string& file_name, + shared_ptr<internal::Descriptor<RWFile>>* file, + bool* created_desc) { shared_ptr<internal::Descriptor<RWFile>> d; + bool cd; { std::lock_guard<simple_spinlock> l(lock_); - d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &rwf_descs_); + d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, + &rwf_descs_, &cd); DCHECK(d); +#ifndef NDEBUG // Enforce the invariant that a particular file name may only be used by one // descriptor at a time. This is expensive so it's only done in DEBUG mode. - DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_)); + bool ignored; + CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, + &raf_descs_, &ignored)); +#endif } if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); } - - // Check that the underlying file can be opened (no-op for found descriptors). - // - // Done outside the lock. - RETURN_NOT_OK(d->Init()); *file = std::move(d); + *created_desc = cd; return Status::OK(); } template <> -Status FileCache::OpenExistingFile(const string& file_name, - shared_ptr<RandomAccessFile>* file) { +Status FileCache::DoOpenFile(const string& file_name, + shared_ptr<internal::Descriptor<RandomAccessFile>>* file, + bool* created_desc) { shared_ptr<internal::Descriptor<RandomAccessFile>> d; + bool cd; { std::lock_guard<simple_spinlock> l(lock_); - d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &raf_descs_); + d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, + &raf_descs_, &cd); DCHECK(d); +#ifndef NDEBUG // Enforce the invariant that a particular file name may only be used by one // descriptor at a time. This is expensive so it's only done in DEBUG mode. - DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_)); + bool ignored; + CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, + &rwf_descs_, &ignored)); +#endif } if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); } + *file = std::move(d); + *created_desc = cd; + return Status::OK(); +} + +template <> +Status FileCache::OpenFile<Env::CREATE_OR_OPEN>(const string& file_name, + shared_ptr<RWFile>* file) { + shared_ptr<internal::Descriptor<RWFile>> d; + bool ignored; + RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored)); + + // Check that the underlying file can be opened (no-op for found descriptors). + RETURN_NOT_OK(d->Init<Env::CREATE_OR_OPEN>()); + *file = std::move(d); + return Status::OK(); +} + +template <> +Status FileCache::OpenFile<Env::MUST_CREATE>(const string& file_name, + shared_ptr<RWFile>* file) { + shared_ptr<internal::Descriptor<RWFile>> d; + bool created_desc; + RETURN_NOT_OK(DoOpenFile(file_name, &d, &created_desc)); + + if (!created_desc) { + return Status::AlreadyPresent("file already exists", file_name); + } + + // Check that the underlying file can be opened (no-op for found descriptors). + RETURN_NOT_OK(d->Init<Env::MUST_CREATE>()); + *file = std::move(d); + return Status::OK(); +} + +template <> +Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name, + shared_ptr<RWFile>* file) { + shared_ptr<internal::Descriptor<RWFile>> d; + bool ignored; + RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored)); + + // Check that the underlying file can be opened (no-op for found descriptors). + RETURN_NOT_OK(d->Init<Env::MUST_EXIST>()); + *file = std::move(d); + return Status::OK(); +} + +template <> +Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name, + shared_ptr<RandomAccessFile>* file) { + shared_ptr<internal::Descriptor<RandomAccessFile>> d; + bool ignored; + RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored)); // Check that the underlying file can be opened (no-op for found descriptors). - // - // Done outside the lock. RETURN_NOT_OK(d->Init()); *file = std::move(d); return Status::OK(); @@ -543,8 +608,10 @@ Status FileCache::DeleteFile(const string& file_name) { // descriptor in the first map. { std::lock_guard<simple_spinlock> l(lock_); + bool ignored; { - auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_); + auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, + &rwf_descs_, &ignored); if (d) { if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); @@ -554,7 +621,8 @@ Status FileCache::DeleteFile(const string& file_name) { } } { - auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_); + auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, + &raf_descs_, &ignored); if (d) { if (d->base_.deleted()) { return Status::NotFound(kAlreadyDeleted, file_name); @@ -585,13 +653,14 @@ void FileCache::Invalidate(const string& file_name) { shared_ptr<internal::Descriptor<RandomAccessFile>> raf_desc; { std::lock_guard<simple_spinlock> l(lock_); + bool ignored; rwf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, - &rwf_descs_); + &rwf_descs_, &ignored); DCHECK(rwf_desc); rwf_desc->base_.MarkInvalidated(); raf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, - &raf_descs_); + &raf_descs_, &ignored); DCHECK(raf_desc); raf_desc->base_.MarkInvalidated(); } @@ -669,7 +738,8 @@ template <class FileType> shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked( const string& file_name, FindMode mode, - DescriptorMap<FileType>* descs) { + DescriptorMap<FileType>* descs, + bool* created_desc) { DCHECK(lock_.is_locked()); shared_ptr<internal::Descriptor<FileType>> d; @@ -682,6 +752,7 @@ shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked( // Descriptor is still valid, return it. VLOG(2) << "Found existing descriptor: " << file_name; + *created_desc = false; return d; } // Descriptor has expired; erase it and pretend we found nothing. @@ -692,6 +763,9 @@ shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked( d = std::make_shared<internal::Descriptor<FileType>>(this, file_name); EmplaceOrDie(descs, file_name, d); VLOG(2) << "Created new descriptor: " << file_name; + *created_desc = true; + } else { + *created_desc = false; } return d; } diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h index ed3663c..c22ee91 100644 --- a/src/kudu/util/file_cache.h +++ b/src/kudu/util/file_cache.h @@ -28,15 +28,12 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/util/cache.h" #include "kudu/util/countdown_latch.h" +#include "kudu/util/env.h" #include "kudu/util/locks.h" #include "kudu/util/status.h" namespace kudu { -class Env; -class RWFile; -class RandomAccessFile; - namespace internal { template <class FileType> @@ -63,7 +60,7 @@ class Thread; // The core of the client-facing API is the cache descriptor. A descriptor // uniquely identifies an opened file. To a client, a descriptor is just an // open file interface of the variety defined in util/env.h. Clients open -// descriptors via the OpenExistingFile() cache methods. +// descriptors via the OpenFile() cache methods. // // Descriptors are shared objects; an existing descriptor is handed back to a // client if a file with the same name is already opened. To facilitate @@ -118,12 +115,27 @@ class FileCache { // to a file-like interface but interfaces with the cache under the hood to // reopen a file as needed during file operations. // - // The underlying file is opened immediately to verify that it indeed exists, - // but may be closed later if the cache reaches its upper bound on the number - // of open files. It is also closed when the descriptor's last reference is dropped. - template <class FileType> - Status OpenExistingFile(const std::string& file_name, - std::shared_ptr<FileType>* file); + // The underlying file is opened immediately to respect 'Mode', but may be + // closed later if the cache reaches its upper bound on the number of open + // files. It is also closed when the descriptor's last reference is dropped. + // + // All file types honor a 'Mode' of MUST_EXIST. Some may honor other modes as + // well, although transparently reopening evicted files will always use + // MUST_EXIST. Different combinations of modes and file types are expressed as + // template specializations; if a file type doesn't support a particular mode, + // there will be a linker error. + // + // TODO(adar): The file cache tries to behave as if users were accessing the + // underlying POSIX filesystem directly, but its semantics aren't 100% correct + // when using modes other than MUST_EXIST. For example, the behavior of + // MUST_CREATE and CREATE_OR_OPEN isn't quite right for open files marked for + // deletion. In theory we should "unmark" such a file to indicate that it was + // recreated, and truncate it so it's empty for the second client, but the + // truncation would corrupt the file for the first client. In short, take + // great care when using any mode apart from MUST_EXIST. + template <Env::OpenMode Mode, class FileType> + Status OpenFile(const std::string& file_name, + std::shared_ptr<FileType>* file); // Deletes a file by name through the cache. // @@ -183,7 +195,10 @@ class FileCache { template <class FileType> static void ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs); - // Looks up a descriptor by file name. + // Looks up a descriptor by file name or creates a new one (if requested). + // + // The value of 'created_desc' will be set in accordance with whether a new + // descriptor was created. // // Must be called with 'lock_' held. enum class FindMode { @@ -197,11 +212,19 @@ class FileCache { std::shared_ptr<internal::Descriptor<FileType>> FindDescriptorUnlocked( const std::string& file_name, FindMode mode, - DescriptorMap<FileType>* descs); + DescriptorMap<FileType>* descs, + bool* created_desc); // Periodically removes expired descriptors from the descriptor maps. void RunDescriptorExpiry(); + // Actually opens the file as per OpenFile. Used to encapsulate the bulk of + // OpenFile because C++ prohibits partial specialization of template functions. + template <class FileType> + Status DoOpenFile(const std::string& file_name, + std::shared_ptr<FileType>* file, + bool* created_desc); + // Status message prefix for files that have already been marked as deleted. static const char* const kAlreadyDeleted;
