This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 69172e8 fs: move file cache to server
69172e8 is described below
commit 69172e80ce2c1c1eaed19242d08037372f38ed19
Author: Adar Dembo <[email protected]>
AuthorDate: Fri Jan 10 16:49:36 2020 -0800
fs: move file cache to server
To use the file cache in the WAL, it must be hoisted out from under the
block manager into a more central location. I opted for ServerBase rather
than FsManager given that some tests use multiple FsManagers.
In an effort to avoid excessive plumbing in FsManager-using tests, I
resuscitated the non-file-cache block manager code paths. It's not too much
complexity and the PREDICT_TRUE macros should mitigate the cost of the
branches somewhat. Both cached and non-cached code paths have test coverage:
1. Block manager tests explicitly instantiate a file cache.
2. All other non-server tests do not.
3. Server tests use the server's file cache.
Change-Id: Ice92c3622c954b06b773c58d51f08082010d7de3
Reviewed-on: http://gerrit.cloudera.org:8080/15011
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/fs/block_manager-stress-test.cc | 55 +++++++--------
src/kudu/fs/block_manager-test.cc | 19 +++---
src/kudu/fs/block_manager.cc | 63 -----------------
src/kudu/fs/block_manager.h | 6 --
src/kudu/fs/file_block_manager.cc | 20 ++++--
src/kudu/fs/file_block_manager.h | 5 +-
src/kudu/fs/fs_manager.cc | 10 +--
src/kudu/fs/fs_manager.h | 7 ++
src/kudu/fs/log_block_manager-test.cc | 48 ++++++++-----
src/kudu/fs/log_block_manager.cc | 114 ++++++++++++++++++++-----------
src/kudu/fs/log_block_manager.h | 5 +-
src/kudu/server/server_base.cc | 52 +++++++++++++-
src/kudu/server/server_base.h | 4 ++
13 files changed, 230 insertions(+), 178 deletions(-)
diff --git a/src/kudu/fs/block_manager-stress-test.cc
b/src/kudu/fs/block_manager-stress-test.cc
index 828c10a..9ea3798 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -49,7 +49,9 @@
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/file_cache.h"
#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
@@ -61,7 +63,6 @@
DECLARE_bool(cache_force_single_shard);
DECLARE_double(log_container_excess_space_before_cleanup_fraction);
DECLARE_double(log_container_live_metadata_before_compact_ratio);
-DECLARE_int64(block_manager_max_open_files);
DECLARE_uint64(log_container_max_size);
DECLARE_uint64(log_container_preallocate_bytes);
@@ -84,6 +85,7 @@ DEFINE_int32(num_inconsistencies, 16,
DEFINE_string(block_manager_paths, "", "Comma-separated list of paths to "
"use for block storage. If empty, will use the default unit "
"test path");
+DEFINE_int32(max_open_files, 32, "Maximum size of the test's file cache");
using std::string;
using std::shared_ptr;
@@ -117,23 +119,21 @@ template <typename T>
class BlockManagerStressTest : public KuduTest {
public:
BlockManagerStressTest() :
- rand_seed_(SeedRandom()),
- stop_latch_(1),
- test_error_manager_(new FsErrorManager()),
- test_tablet_name_("test_tablet"),
- total_blocks_written_(0),
- total_bytes_written_(0),
- total_blocks_read_(0),
- total_bytes_read_(0),
- total_blocks_deleted_(0) {
+ rand_seed_(SeedRandom()),
+ stop_latch_(1),
+ file_cache_("test_cache", env_, FLAGS_max_open_files,
+ scoped_refptr<MetricEntity>()),
+ test_tablet_name_("test_tablet"),
+ total_blocks_written_(0),
+ total_bytes_written_(0),
+ total_blocks_read_(0),
+ total_bytes_read_(0),
+ total_blocks_deleted_(0) {
// Increase the number of containers created.
FLAGS_log_container_max_size = 1 * 1024 * 1024;
FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024;
- // Ensure the file cache is under stress too.
- FLAGS_block_manager_max_open_files = 32;
-
// Maximize the amount of cleanup triggered by the extra space heuristic.
FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
@@ -146,16 +146,13 @@ class BlockManagerStressTest : public KuduTest {
// Defer block manager creation until after the above flags are set.
bm_.reset(CreateBlockManager());
- bm_->Open(nullptr);
- dd_manager_->CreateDataDirGroup(test_tablet_name_);
+ CHECK_OK(file_cache_.Init());
+ CHECK_OK(bm_->Open(nullptr));
+ CHECK_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
CHECK_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_,
&test_group_pb_));
}
virtual void TearDown() override {
- // Ensure the proper destructor order. The directory manager must outlive
- // the block manager.
- bm_.reset();
-
// If non-standard paths were provided we need to delete them in between
// test runs.
if (!FLAGS_block_manager_paths.empty()) {
@@ -164,7 +161,6 @@ class BlockManagerStressTest : public KuduTest {
Substitute("Couldn't recursively delete $0", dd));
}
}
- dd_manager_.reset();
}
BlockManager* CreateBlockManager() {
@@ -188,8 +184,8 @@ class BlockManagerStressTest : public KuduTest {
CHECK_OK(DataDirManager::OpenExistingForTests(env_, data_dirs,
DataDirManagerOptions(), &dd_manager_));
}
- return new T(env_, dd_manager_.get(), test_error_manager_.get(),
- BlockManagerOptions());
+ return new T(env_, dd_manager_.get(), &error_manager_,
+ &file_cache_, BlockManagerOptions());
}
void RunTest(double secs) {
@@ -266,14 +262,13 @@ class BlockManagerStressTest : public KuduTest {
// Protects written_blocks_.
simple_spinlock lock_;
- // The block manager.
- unique_ptr<BlockManager> bm_;
-
- // The directory manager.
unique_ptr<DataDirManager> dd_manager_;
- // The error manager.
- unique_ptr<FsErrorManager> test_error_manager_;
+ FsErrorManager error_manager_;
+
+ FileCache file_cache_;
+
+ unique_ptr<BlockManager> bm_;
// Test group of disk to spread data across.
DataDirGroupPB test_group_pb_;
@@ -484,7 +479,7 @@ void BlockManagerStressTest<T>::DeleterThread() {
template <>
int BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
- return FLAGS_block_manager_max_open_files +
+ return FLAGS_max_open_files +
// Each open block exists outside the file cache.
(FLAGS_num_writer_threads * FLAGS_block_group_size *
FLAGS_block_group_number) +
// Each reader thread can open a file outside the cache if its lookup
@@ -495,7 +490,7 @@ int
BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
template <>
int BlockManagerStressTest<LogBlockManager>::GetMaxFdCount() const {
- return FLAGS_block_manager_max_open_files +
+ return FLAGS_max_open_files +
// If all containers are full, each open block could theoretically
// result in a new container, which is two files briefly outside the
// cache (before they are inserted and evict other cached files).
diff --git a/src/kudu/fs/block_manager-test.cc
b/src/kudu/fs/block_manager-test.cc
index 1acb0d2..404c591 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -50,6 +50,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/array_view.h" // IWYU pragma: keep
#include "kudu/util/env.h"
+#include "kudu/util/file_cache.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -119,11 +120,12 @@ template <typename T>
class BlockManagerTest : public KuduTest {
public:
BlockManagerTest() :
- test_tablet_name_("test_tablet"),
- test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
- test_error_manager_(new FsErrorManager()),
- bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
- shared_ptr<MemTracker>())) {
+ test_tablet_name_("test_tablet"),
+ test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
+ file_cache_("test_cache", env_, 1, scoped_refptr<MetricEntity>()),
+ bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
+ shared_ptr<MemTracker>())) {
+ CHECK_OK(file_cache_.Init());
}
virtual void SetUp() override {
@@ -167,8 +169,8 @@ class BlockManagerTest : public KuduTest {
BlockManagerOptions opts;
opts.metric_entity = metric_entity;
opts.parent_mem_tracker = parent_mem_tracker;
- return new T(env_, this->dd_manager_.get(), test_error_manager_.get(),
- std::move(opts));
+ return new T(env_, this->dd_manager_.get(), &error_manager_,
+ &file_cache_, std::move(opts));
}
Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
@@ -231,8 +233,9 @@ class BlockManagerTest : public KuduTest {
DataDirGroupPB test_group_pb_;
string test_tablet_name_;
CreateBlockOptions test_block_opts_;
- unique_ptr<FsErrorManager> test_error_manager_;
+ FsErrorManager error_manager_;
unique_ptr<DataDirManager> dd_manager_;
+ FileCache file_cache_;
unique_ptr<T> bm_;
};
diff --git a/src/kudu/fs/block_manager.cc b/src/kudu/fs/block_manager.cc
index 7156c91..4efeb0a 100644
--- a/src/kudu/fs/block_manager.cc
+++ b/src/kudu/fs/block_manager.cc
@@ -17,19 +17,9 @@
#include "kudu/fs/block_manager.h"
-#include <algorithm>
-#include <mutex>
-#include <ostream>
-
#include <gflags/gflags.h>
-#include <glog/logging.h>
-#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
-#include "kudu/gutil/strings/numbers.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/env.h"
-#include "kudu/util/faststring.h"
#include "kudu/util/flag_tags.h"
// The default value is optimized for throughput in the case that
@@ -53,64 +43,11 @@ DEFINE_string(block_manager_preflush_control, "finalize",
"never be pre-flushed but still be flushed when closed.");
TAG_FLAG(block_manager_preflush_control, experimental);
-DEFINE_int64(block_manager_max_open_files, -1,
- "Maximum number of open file descriptors to be used for data "
- "blocks. If -1, Kudu will automatically calculate this value. "
- "This is a soft limit. It is an error to use a value of 0.");
-TAG_FLAG(block_manager_max_open_files, advanced);
-TAG_FLAG(block_manager_max_open_files, evolving);
-
-static bool ValidateMaxOpenFiles(const char* /*flagname*/, int64_t value) {
- if (value == 0 || value < -1) {
- LOG(ERROR) << "Invalid max open files: cannot be " << value;
- return false;
- }
- return true;
-}
-DEFINE_validator(block_manager_max_open_files, &ValidateMaxOpenFiles);
-
-using strings::Substitute;
-
namespace kudu {
namespace fs {
BlockManagerOptions::BlockManagerOptions()
: read_only(false) {}
-int64_t GetFileCacheCapacityForBlockManager(Env* env) {
- // Maximize this process' open file limit first, if possible.
- static std::once_flag once;
- std::call_once(once, [&]() {
- env->IncreaseResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
- });
-
- uint64_t rlimit =
- env->GetResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
- // See block_manager_max_open_files.
- if (FLAGS_block_manager_max_open_files == -1) {
- // Use file-max as a possible upper bound.
- faststring buf;
- uint64_t buf_val;
- if (ReadFileToString(env, "/proc/sys/fs/file-max", &buf).ok() &&
- safe_strtou64(buf.ToString(), &buf_val)) {
- rlimit = std::min(rlimit, buf_val);
- }
-
- // Callers of this function expect a signed 64-bit integer, and rlimit
- // is an uint64_t type, so we need to avoid overflow.
- // The percentage we currently use is 40% by default, and although in fact
- // 40% of any value of the `uint64_t` type must be less than `kint64max`,
- // but the percentage may be adjusted in the future, such as to 60%, so to
- // prevent accidental overflow, we cap rlimit here.
- return std::min((rlimit / 5) * 2, static_cast<uint64_t>(kint64max));
- }
- LOG_IF(FATAL, FLAGS_block_manager_max_open_files > rlimit) <<
- Substitute(
- "Configured open file limit (block_manager_max_open_files) $0 "
- "exceeds process open file limit (ulimit) $1",
- FLAGS_block_manager_max_open_files, rlimit);
- return FLAGS_block_manager_max_open_files;
-}
-
} // namespace fs
} // namespace kudu
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 43dbeac..87239c3 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -30,10 +30,8 @@
namespace kudu {
class BlockId;
-class Env;
class MemTracker;
class Slice;
-
template <typename T>
class ArrayView;
@@ -318,9 +316,5 @@ class BlockDeletionTransaction {
virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) = 0;
};
-// Compute an upper bound for a file cache embedded within a block manager
-// using resource limits obtained from the system.
-int64_t GetFileCacheCapacityForBlockManager(Env* env);
-
} // namespace fs
} // namespace kudu
diff --git a/src/kudu/fs/file_block_manager.cc
b/src/kudu/fs/file_block_manager.cc
index cef55c1..0c5796c 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -695,13 +695,13 @@ bool FileBlockManager::FindBlockPath(const BlockId&
block_id,
FileBlockManager::FileBlockManager(Env* env,
DataDirManager* dd_manager,
FsErrorManager* error_manager,
+ FileCache* file_cache,
BlockManagerOptions opts)
: env_(DCHECK_NOTNULL(env)),
dd_manager_(dd_manager),
error_manager_(DCHECK_NOTNULL(error_manager)),
opts_(std::move(opts)),
- file_cache_("fbm", env_, GetFileCacheCapacityForBlockManager(env_),
- opts_.metric_entity),
+ file_cache_(file_cache),
rand_(GetRandomSeed32()),
next_block_id_(rand_.Next64()),
mem_tracker_(MemTracker::CreateTracker(-1,
@@ -716,8 +716,6 @@ FileBlockManager::~FileBlockManager() {
}
Status FileBlockManager::Open(FsReport* report) {
- RETURN_NOT_OK(file_cache_.Init());
-
// Prepare the filesystem report and either return or log it.
FsReport local_report;
set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -830,7 +828,13 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
VLOG(1) << "Opening block with id " << block_id.ToString() << " at " << path;
shared_ptr<RandomAccessFile> reader;
- RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_.OpenExistingFile(path, &reader));
+ if (PREDICT_TRUE(file_cache_)) {
+ RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenExistingFile(path,
&reader));
+ } else {
+ unique_ptr<RandomAccessFile> r;
+ RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(path, &r));
+ reader.reset(r.release());
+ }
block->reset(new internal::FileReadableBlock(this, block_id, reader));
return Status::OK();
}
@@ -854,7 +858,11 @@ Status FileBlockManager::DeleteBlock(const BlockId&
block_id) {
return Status::NotFound(
Substitute("Block $0 not found", block_id.ToString()));
}
- RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_.DeleteFile(path));
+ if (PREDICT_TRUE(file_cache_)) {
+ RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->DeleteFile(path));
+ } else {
+ RETURN_NOT_OK_FBM_DISK_FAILURE(env_->DeleteFile(path));
+ }
// We don't bother fsyncing the parent directory as there's nothing to be
// gained by ensuring that the deletion is made durable. Even if we did
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 032c672..38b0653 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -27,7 +27,6 @@
#include "kudu/fs/block_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/util/atomic.h"
-#include "kudu/util/file_cache.h"
#include "kudu/util/locks.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
@@ -36,6 +35,7 @@ namespace kudu {
class BlockId;
class Env;
+class FileCache;
class MemTracker;
namespace fs {
@@ -74,6 +74,7 @@ class FileBlockManager : public BlockManager {
FileBlockManager(Env* env,
DataDirManager* dd_manager,
FsErrorManager* error_manager,
+ FileCache* file_cache,
BlockManagerOptions opts);
virtual ~FileBlockManager();
@@ -134,7 +135,7 @@ class FileBlockManager : public BlockManager {
const BlockManagerOptions opts_;
// Manages files opened for reading.
- FileCache file_cache_;
+ FileCache* file_cache_;
// For generating block IDs.
ThreadSafeRandom rand_;
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 62d7ab1..a60b922 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -140,7 +140,8 @@ FsManagerOpts::FsManagerOpts()
metadata_root(FLAGS_fs_metadata_dir),
block_manager_type(FLAGS_block_manager),
read_only(false),
- update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {
+ update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES),
+ file_cache(nullptr) {
data_roots = strings::Split(FLAGS_fs_data_dirs, ",", strings::SkipEmpty());
}
@@ -149,7 +150,8 @@ FsManagerOpts::FsManagerOpts(const string& root)
data_roots({ root }),
block_manager_type(FLAGS_block_manager),
read_only(false),
- update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {}
+ update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES),
+ file_cache(nullptr) {}
FsManager::FsManager(Env* env, FsManagerOpts opts)
: env_(DCHECK_NOTNULL(env)),
@@ -299,10 +301,10 @@ void FsManager::InitBlockManager() {
bm_opts.read_only = opts_.read_only;
if (opts_.block_manager_type == "file") {
block_manager_.reset(new FileBlockManager(
- env_, dd_manager_.get(), error_manager_.get(), std::move(bm_opts)));
+ env_, dd_manager_.get(), error_manager_.get(), opts_.file_cache,
std::move(bm_opts)));
} else {
block_manager_.reset(new LogBlockManager(
- env_, dd_manager_.get(), error_manager_.get(), std::move(bm_opts)));
+ env_, dd_manager_.get(), error_manager_.get(), opts_.file_cache,
std::move(bm_opts)));
}
}
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 796c145..c719be7 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -44,6 +44,7 @@ DECLARE_bool(enable_data_block_fsync);
namespace kudu {
class BlockId;
+class FileCache;
class InstanceMetadataPB;
class MemTracker;
@@ -123,6 +124,12 @@ struct FsManagerOpts {
//
// Defaults to UPDATE_AND_IGNORE_FAILURES.
fs::UpdateInstanceBehavior update_instances;
+
+ // The file cache to be used for long-lived opened files (e.g. in the block
+ // manager). If null, opened files will not be cached.
+ //
+ // Defaults to null.
+ FileCache* file_cache;
};
// FsManager provides helpers to read data and metadata files,
diff --git a/src/kudu/fs/log_block_manager-test.cc
b/src/kudu/fs/log_block_manager-test.cc
index 7166516..e1e2676 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -55,6 +55,7 @@
#include "kudu/gutil/strings/util.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
+#include "kudu/util/file_cache.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
@@ -81,7 +82,6 @@ DECLARE_double(env_inject_eio);
DECLARE_double(log_container_excess_space_before_cleanup_fraction);
DECLARE_double(log_container_live_metadata_before_compact_ratio);
DECLARE_int32(fs_target_data_dirs_per_tablet);
-DECLARE_int64(block_manager_max_open_files);
DECLARE_int64(log_container_max_blocks);
DECLARE_string(block_manager_preflush_control);
DECLARE_string(env_inject_eio_globs);
@@ -113,10 +113,14 @@ class LogBlockContainer;
class LogBlockManagerTest : public KuduTest {
public:
LogBlockManagerTest() :
- test_tablet_name_("test_tablet"),
- test_block_opts_({ test_tablet_name_ }),
- test_error_manager_(new FsErrorManager()),
- bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+ test_tablet_name_("test_tablet"),
+ test_block_opts_({ test_tablet_name_ }),
+ // Use a small file cache (smaller than the number of containers).
+ //
+ // Not strictly necessary except for
TestDeleteFromContainerAfterMetadataCompaction.
+ file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()),
+ bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+ CHECK_OK(file_cache_.Init());
}
void SetUp() override {
@@ -137,10 +141,11 @@ class LogBlockManagerTest : public KuduTest {
CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs,
DataDirManagerOptions(), &dd_manager_));
}
+
BlockManagerOptions opts;
opts.metric_entity = metric_entity;
- return new LogBlockManager(env_, dd_manager_.get(),
test_error_manager_.get(),
- std::move(opts));
+ return new LogBlockManager(env_, dd_manager_.get(), &error_manager_,
+ &file_cache_, std::move(opts));
}
Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity =
nullptr,
@@ -224,7 +229,8 @@ class LogBlockManagerTest : public KuduTest {
CreateBlockOptions test_block_opts_;
unique_ptr<DataDirManager> dd_manager_;
- unique_ptr<FsErrorManager> test_error_manager_;
+ FsErrorManager error_manager_;
+ FileCache file_cache_;
unique_ptr<LogBlockManager> bm_;
private:
@@ -1506,10 +1512,14 @@ TEST_F(LogBlockManagerTest,
TestDeleteDeadContainersAtStartup) {
FLAGS_log_container_max_size = 0;
// Create one container.
- unique_ptr<WritableBlock> block;
- ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
- ASSERT_OK(block->Append("a"));
- ASSERT_OK(block->Close());
+ BlockId block_id;
+ {
+ unique_ptr<WritableBlock> block;
+ ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
+ ASSERT_OK(block->Append("a"));
+ ASSERT_OK(block->Close());
+ block_id = block->id();
+ }
string data_file_name;
string metadata_file_name;
NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
@@ -1525,7 +1535,7 @@ TEST_F(LogBlockManagerTest,
TestDeleteDeadContainersAtStartup) {
{
shared_ptr<BlockDeletionTransaction> deletion_transaction =
this->bm_->NewDeletionTransaction();
- deletion_transaction->AddDeletedBlock(block->id());
+ deletion_transaction->AddDeletedBlock(block_id);
vector<BlockId> deleted;
ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
}
@@ -1602,8 +1612,6 @@ TEST_F(LogBlockManagerTest,
TestCompactFullContainerMetadataAtStartup) {
TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
// Compact aggressively.
FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
- // Use a small file cache (smaller than the number of containers).
- FLAGS_block_manager_max_open_files = 50;
// Use a single shard so that we have an accurate max cache capacity
// regardless of the number of cores on the machine.
FLAGS_cache_force_single_shard = true;
@@ -1681,7 +1689,7 @@ TEST_F(LogBlockManagerTest,
TestOpenWithFailedDirectories) {
DataDirManagerOptions(), &dd_manager_));
// Wire in a callback to fail data directories.
- test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+ error_manager_.SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
Bind(&DataDirManager::MarkDirFailedByUuid,
Unretained(dd_manager_.get())));
bm_.reset(CreateBlockManager(nullptr));
@@ -1990,6 +1998,10 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
};
const auto CreateMetadataFile = [&] () {
+ // We're often recreating an existing file, so we must invalidate any
+ // entry in the file cache first.
+ file_cache_.Invalidate(metadata_file_name);
+
unique_ptr<WritableFile> metadata_file_writer;
ASSERT_OK(env_->NewWritableFile(metadata_file_name,
&metadata_file_writer));
ASSERT_OK(metadata_file_writer->Append(Slice("a")));
@@ -1997,6 +2009,10 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
};
const auto CreateDataFile = [&] () {
+ // We're often recreating an existing file, so we must invalidate any
+ // entry in the file cache first.
+ file_cache_.Invalidate(data_file_name);
+
unique_ptr<WritableFile> data_file_writer;
ASSERT_OK(env_->NewWritableFile(data_file_name, &data_file_writer));
data_file_writer->Close();
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index b6c4c4b..b09f319 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -720,10 +720,21 @@ LogBlockContainer::~LogBlockContainer() {
CHECK(!block_manager_->opts_.read_only);
string data_file_name = data_file_->filename();
string metadata_file_name = metadata_file_->filename();
-
CONTAINER_DISK_FAILURE(block_manager_->file_cache_.DeleteFile(data_file_name),
- "Could not delete dead container data file " + data_file_name);
-
CONTAINER_DISK_FAILURE(block_manager_->file_cache_.DeleteFile(metadata_file_name),
- "Could not delete dead container metadata file " + metadata_file_name);
+ string data_failure_msg =
+ "Could not delete dead container data file " + data_file_name;
+ string metadata_failure_msg =
+ "Could not delete dead container metadata file " + metadata_file_name;
+ if (PREDICT_TRUE(block_manager_->file_cache_)) {
+
CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(data_file_name),
+ data_failure_msg);
+
CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(metadata_file_name),
+ metadata_failure_msg);
+ } else {
+ CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(data_file_name),
+ data_failure_msg);
+
CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(metadata_file_name),
+ metadata_failure_msg);
+ }
}
}
@@ -746,24 +757,28 @@ Status LogBlockContainer::Create(LogBlockManager*
block_manager,
string data_path;
Status metadata_status;
Status data_status;
- unique_ptr<RWFile> metadata_writer;
- unique_ptr<RWFile> data_file;
- RWFileOptions wr_opts;
- wr_opts.mode = Env::MUST_CREATE;
+ shared_ptr<RWFile> metadata_writer;
+ shared_ptr<RWFile> data_file;
// Repeat in the event of a container id collision (unlikely).
//
// When looping, we delete any created-and-orphaned files.
do {
+ unique_ptr<RWFile> rwf;
+
if (metadata_writer) {
block_manager->env()->DeleteFile(metadata_path);
}
common_path = JoinPathSegments(dir->dir(),
block_manager->oid_generator()->Next());
metadata_path = StrCat(common_path,
LogBlockManager::kContainerMetadataFileSuffix);
+ RWFileOptions wr_opts;
+ wr_opts.mode = Env::MUST_CREATE;
metadata_status = block_manager->env()->NewRWFile(wr_opts,
metadata_path,
- &metadata_writer);
+ &rwf);
+ metadata_writer.reset(rwf.release());
+
if (data_file) {
block_manager->env()->DeleteFile(data_path);
}
@@ -772,29 +787,27 @@ Status LogBlockContainer::Create(LogBlockManager*
block_manager,
rw_opts.mode = Env::MUST_CREATE;
data_status = block_manager->env()->NewRWFile(rw_opts,
data_path,
- &data_file);
+ &rwf);
+ data_file.reset(rwf.release());
} while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
data_status.IsAlreadyPresent()));
if (metadata_status.ok() && data_status.ok()) {
- unique_ptr<WritablePBContainerFile> metadata_file;
- shared_ptr<RWFile> cached_data_file;
-
- metadata_writer.reset();
- shared_ptr<RWFile> cached_metadata_writer;
-
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
- metadata_path, &cached_metadata_writer));
- metadata_file.reset(new WritablePBContainerFile(
- std::move(cached_metadata_writer)));
-
- data_file.reset();
-
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
- data_path, &cached_data_file));
+ if (PREDICT_TRUE(block_manager->file_cache_)) {
+ metadata_writer.reset();
+
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+ metadata_path, &metadata_writer));
+ data_file.reset();
+
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+ data_path, &data_file));
+ }
+ unique_ptr<WritablePBContainerFile> metadata_file(new
WritablePBContainerFile(
+ std::move(metadata_writer)));
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB()));
container->reset(new LogBlockContainer(block_manager,
dir,
std::move(metadata_file),
- std::move(cached_data_file)));
+ std::move(data_file)));
VLOG(1) << "Created log block container " << (*container)->ToString();
}
@@ -819,16 +832,28 @@ Status LogBlockContainer::Open(LogBlockManager*
block_manager,
// Open the existing metadata and data files for writing.
shared_ptr<RWFile> metadata_file;
-
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
- metadata_path, &metadata_file));
+ shared_ptr<RWFile> data_file;
+ if (PREDICT_TRUE(block_manager->file_cache_)) {
+
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+ metadata_path, &metadata_file));
+
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+ data_path, &data_file));
+ } else {
+ RWFileOptions opts;
+ opts.mode = Env::MUST_EXIST;
+ unique_ptr<RWFile> rwf;
+ RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
+ metadata_path, &rwf));
+ metadata_file.reset(rwf.release());
+ RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
+ data_path, &rwf));
+ data_file.reset(rwf.release());
+ }
+
unique_ptr<WritablePBContainerFile> metadata_pb_writer;
metadata_pb_writer.reset(new
WritablePBContainerFile(std::move(metadata_file)));
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_pb_writer->OpenExisting());
- shared_ptr<RWFile> data_file;
-
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
- data_path, &data_file));
-
uint64_t data_file_size;
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
@@ -1198,8 +1223,17 @@ Status LogBlockContainer::SyncMetadata() {
Status LogBlockContainer::ReopenMetadataWriter() {
shared_ptr<RWFile> f;
- RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_.OpenExistingFile(
- metadata_file_->filename(), &f));
+ if (PREDICT_TRUE(block_manager_->file_cache_)) {
+ RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenExistingFile(
+ metadata_file_->filename(), &f));
+ } else {
+ unique_ptr<RWFile> f_uniq;
+ RWFileOptions opts;
+ opts.mode = Env::MUST_EXIST;
+ RETURN_NOT_OK_HANDLE_ERROR(block_manager_->env_->NewRWFile(opts,
+ metadata_file_->filename(), &f_uniq));
+ f.reset(f_uniq.release());
+ }
unique_ptr<WritablePBContainerFile> w;
w.reset(new WritablePBContainerFile(std::move(f)));
RETURN_NOT_OK_HANDLE_ERROR(w->OpenExisting());
@@ -1909,6 +1943,7 @@ const map<int64_t, int64_t>
LogBlockManager::kPerFsBlockSizeBlockLimits({
LogBlockManager::LogBlockManager(Env* env,
DataDirManager* dd_manager,
FsErrorManager* error_manager,
+ FileCache* file_cache,
BlockManagerOptions opts)
: env_(DCHECK_NOTNULL(env)),
dd_manager_(DCHECK_NOTNULL(dd_manager)),
@@ -1917,8 +1952,7 @@ LogBlockManager::LogBlockManager(Env* env,
mem_tracker_(MemTracker::CreateTracker(-1,
"log_block_manager",
opts_.parent_mem_tracker)),
- file_cache_("lbm", env, GetFileCacheCapacityForBlockManager(env),
- opts_.metric_entity),
+ file_cache_(file_cache),
buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
next_block_id_(1) {
managed_block_shards_.resize(kBlockMapChunk);
@@ -1986,8 +2020,6 @@ LogBlockManager::~LogBlockManager() {
} while (false)
Status LogBlockManager::Open(FsReport* report) {
- RETURN_NOT_OK(file_cache_.Init());
-
// Establish (and log) block limits for each data directory using kernel,
// filesystem, and gflags information.
for (const auto& dd : dd_manager_->dirs()) {
@@ -2984,10 +3016,12 @@ Status LogBlockManager::RewriteMetadataFile(const
LogBlockContainer& container,
"could not get file size of temporary
metadata file");
RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->RenameFile(tmp_file_name,
metadata_file_name),
"could not rename temporary metadata
file");
- // Evict the old path from the file cache, so that when we re-open the new
- // metadata file for write, we don't accidentally get a cache hit on the
- // old file descriptor pointing to the now-deleted old version.
- file_cache_.Invalidate(metadata_file_name);
+ if (PREDICT_TRUE(file_cache_)) {
+ // Evict the old path from the file cache, so that when we re-open the new
+ // metadata file for write, we don't accidentally get a cache hit on the
+ // old file descriptor pointing to the now-deleted old version.
+ file_cache_->Invalidate(metadata_file_name);
+ }
tmp_deleter.cancel();
*file_bytes_delta = (static_cast<int64_t>(old_metadata_size) -
new_metadata_size);
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index fcdafac..91c95f3 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -37,7 +37,6 @@
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
-#include "kudu/util/file_cache.h"
#include "kudu/util/locks.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/oid_generator.h"
@@ -46,6 +45,7 @@
namespace kudu {
class Env;
+class FileCache;
namespace fs {
class DataDirManager;
@@ -189,6 +189,7 @@ class LogBlockManager : public BlockManager {
LogBlockManager(Env* env,
DataDirManager* dd_manager,
FsErrorManager* error_manager,
+ FileCache* file_cache,
BlockManagerOptions opts);
virtual ~LogBlockManager();
@@ -441,7 +442,7 @@ class LogBlockManager : public BlockManager {
boost::optional<int64_t>> block_limits_by_data_dir_;
// Manages files opened for reading.
- FileCache file_cache_;
+ FileCache* file_cache_;
// Holds (and owns) all containers loaded from disk.
std::unordered_map<std::string,
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 7539bc0..dcca690 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -17,11 +17,12 @@
#include "kudu/server/server_base.h"
+#include <algorithm>
#include <cstdint>
#include <functional>
+#include <mutex>
#include <sstream>
#include <string>
-#include <utility>
#include <vector>
#include <boost/algorithm/string/predicate.hpp>
@@ -38,7 +39,9 @@
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/fs_report.h"
+#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
@@ -63,6 +66,8 @@
#include "kudu/server/webserver.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/file_cache.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/flag_validators.h"
#include "kudu/util/flags.h"
@@ -211,6 +216,11 @@ DEFINE_uint64(gc_tcmalloc_memory_interval_seconds, 30,
TAG_FLAG(gc_tcmalloc_memory_interval_seconds, advanced);
TAG_FLAG(gc_tcmalloc_memory_interval_seconds, runtime);
+DEFINE_uint64(server_max_open_files, 0,
+ "Maximum number of open file descriptors. If 0, Kudu will "
+ "automatically calculate this value. This is a soft limit");
+TAG_FLAG(server_max_open_files, advanced);
+
DECLARE_bool(use_hybrid_clock);
DECLARE_int32(dns_resolver_max_threads_num);
DECLARE_uint32(dns_resolver_cache_capacity_mb);
@@ -352,6 +362,41 @@ shared_ptr<MemTracker> CreateMemTrackerForServer() {
return shared_ptr<MemTracker>(MemTracker::CreateTracker(-1, id_str));
}
+int64_t GetFileCacheCapacity(Env* env) {
+ // Maximize this process' open file limit first, if possible.
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ env->IncreaseResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
+ });
+
+ uint64_t rlimit =
+ env->GetResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
+ // See server_max_open_files.
+ if (FLAGS_server_max_open_files == 0) {
+ // Use file-max as a possible upper bound.
+ faststring buf;
+ uint64_t buf_val;
+ if (ReadFileToString(env, "/proc/sys/fs/file-max", &buf).ok() &&
+ safe_strtou64(buf.ToString(), &buf_val)) {
+ rlimit = std::min(rlimit, buf_val);
+ }
+
+ // Callers of this function expect a signed 64-bit integer, and rlimit
+ // is an uint64_t type, so we need to avoid overflow.
+ // The percentage we currently use is 40% by default, and although in fact
+ // 40% of any value of the `uint64_t` type must be less than `kint64max`,
+ // but the percentage may be adjusted in the future, such as to 60%, so to
+ // prevent accidental overflow, we cap rlimit here.
+ return std::min((rlimit / 5) * 2, static_cast<uint64_t>(kint64max));
+ }
+ LOG_IF(FATAL, FLAGS_server_max_open_files > rlimit) <<
+ Substitute(
+ "Configured open file limit (server_max_open_files) $0 "
+ "exceeds process open file limit (ulimit) $1",
+ FLAGS_server_max_open_files, rlimit);
+ return FLAGS_server_max_open_files;
+}
+
} // anonymous namespace
ServerBase::ServerBase(string name, const ServerBaseOptions& options,
@@ -362,6 +407,8 @@ ServerBase::ServerBase(string name, const
ServerBaseOptions& options,
metric_registry_(new MetricRegistry()),
metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_.get(),
metric_namespace)),
+ file_cache_(new FileCache("file cache", options.env,
+ GetFileCacheCapacity(options.env),
metric_entity_)),
rpc_server_(new RpcServer(options.rpc_opts)),
result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>(
MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))),
@@ -380,6 +427,7 @@ ServerBase::ServerBase(string name, const
ServerBaseOptions& options,
fs_opts.block_manager_type = options.fs_opts.block_manager_type;
fs_opts.wal_root = options.fs_opts.wal_root;
fs_opts.data_roots = options.fs_opts.data_roots;
+ fs_opts.file_cache = file_cache_.get();
fs_manager_.reset(new FsManager(options.env, std::move(fs_opts)));
if (FLAGS_use_hybrid_clock) {
@@ -444,6 +492,8 @@ Status ServerBase::Init() {
RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal,
FLAGS_keytab_file));
+ RETURN_NOT_OK(file_cache_->Init());
+
fs::FsReport report;
Status s = fs_manager_->Open(&report);
// No instance files existed. Try creating a new FS layout.
diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h
index 77e4c7d..c744007 100644
--- a/src/kudu/server/server_base.h
+++ b/src/kudu/server/server_base.h
@@ -31,6 +31,7 @@
namespace kudu {
class DnsResolver;
+class FileCache;
class FsManager;
class MemTracker;
class MetricEntity;
@@ -104,6 +105,8 @@ class ServerBase {
DnsResolver* dns_resolver() const { return dns_resolver_.get(); }
+ FileCache* file_cache() const { return file_cache_.get(); }
+
// Return a PB describing the status of the server (version info, bound
ports, etc)
Status GetStatusPB(ServerStatusPB* status) const;
@@ -175,6 +178,7 @@ class ServerBase {
std::shared_ptr<MemTracker> mem_tracker_;
std::unique_ptr<MetricRegistry> metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
+ std::unique_ptr<FileCache> file_cache_;
std::unique_ptr<FsManager> fs_manager_;
std::unique_ptr<RpcServer> rpc_server_;
std::unique_ptr<Webserver> web_server_;