This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 411189a8d733a66c363c72f8c404123d68640a3e Author: Tim Armstrong <tarmstr...@cloudera.com> AuthorDate: Fri Aug 2 16:37:02 2019 -0700 IMPALA-8376: directory limits for scratch usage This extends the --scratch_dirs syntax to support specifying a max capacity per directory, similarly to the --data_cache confirmation. The capacity is delimited from the directory name with ":" and uses the usual syntax for specifying memory. The following are valid arguments: * --scratch_dirs=/dir1,/dir2 (no limits) * --scratch_dirs=/dir1,/dir2:25G (only a limit on /dir2) * --scratch_dirs=/dir1:5MB,/dir2 (only a limit on /dir) * --scratch_dirs=/dir1:-1,/dir2:0 (alternative ways of expressing no limit) The usage is tracked with a metric per directory. Allocations from that directory start to fail when the limit is exceeded. These metrics are exposed as tmp-file-mgr.scratch-space-bytes-used.dir-0, tmp-file-mgr.scratch-space-bytes-used.dir-1, etc. Also add support for parsing terabyte specifiers to a utility function that is used for parsing many configurations. Testing: Added a unit test to exercise TmpFileMgr. Manually ran a spilling query on an impalad with multiple scratch dirs configured with different limits. Confirmed via metrics that the capacities were enforced. Change-Id: I696146a65dbb97f1ba200ae472358ae2db6eb441 Reviewed-on: http://gerrit.cloudera.org:8080/13986 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/runtime/tmp-file-mgr-internal.h | 15 ++- be/src/runtime/tmp-file-mgr-test.cc | 203 +++++++++++++++++++++++++++++++++ be/src/runtime/tmp-file-mgr.cc | 128 +++++++++++++++++---- be/src/runtime/tmp-file-mgr.h | 33 +++++- be/src/service/query-options-test.cc | 8 +- be/src/util/parse-util-test.cc | 11 ++ be/src/util/parse-util.cc | 6 + common/thrift/generate_error_codes.py | 3 +- common/thrift/metrics.json | 10 ++ docs/topics/impala_mem_limit.xml | 6 +- 10 files changed, 382 insertions(+), 41 deletions(-) diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h index 5d11c4d..090a019 100644 --- a/be/src/runtime/tmp-file-mgr-internal.h +++ b/be/src/runtime/tmp-file-mgr-internal.h @@ -36,11 +36,13 @@ class TmpFileMgr::File { public: File(FileGroup* file_group, DeviceId device_id, const std::string& path); - /// Allocates 'num_bytes' bytes in this file for a new block of data. - /// The file size is increased by a call to truncate() if necessary. - /// Sets 'offset' to the file offset of the first byte in the allocated + /// Allocates 'num_bytes' bytes in this file for a new block of data if there is + /// free capacity in this temporary directory. If there is insufficient capacity, + /// return false. Otherwise, update state and return true. + /// This function does not actually perform any file operations. + /// On success, sets 'offset' to the file offset of the first byte in the allocated /// range on success. - void AllocateSpace(int64_t num_bytes, int64_t* offset); + bool AllocateSpace(int64_t num_bytes, int64_t* offset); /// Called when an IO error is encountered for this file. Logs the error and blacklists /// the file. @@ -85,7 +87,10 @@ class TmpFileMgr::File { /// Set to true to indicate that we shouldn't allocate any more space in this file. bool blacklisted_; + + /// Helper to get the TmpDir that this file is associated with. + TmpDir* GetDir(); }; -} +} // namespace impala #endif diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 5bed3b4..9b852a6 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -17,6 +17,7 @@ #include <cstdio> #include <cstdlib> +#include <limits> #include <numeric> #include <boost/filesystem.hpp> @@ -52,6 +53,11 @@ namespace impala { using namespace io; +static const int64_t KILOBYTE = 1024L; +static const int64_t MEGABYTE = 1024L * KILOBYTE; +static const int64_t GIGABYTE = 1024L * MEGABYTE; +static const int64_t TERABYTE = 1024L * GIGABYTE; + class TmpFileMgrTest : public ::testing::Test { public: virtual void SetUp() { @@ -76,6 +82,18 @@ class TmpFileMgrTest : public ::testing::Test { DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); } + /// Helper to create a TmpFileMgr and initialise it with InitCustom(). Adds the mgr to + /// 'obj_pool_' for automatic cleanup at the end of each test. Fails the test if + /// InitCustom() fails. + TmpFileMgr* CreateTmpFileMgr(const string& tmp_dirs_spec) { + // Allocate a new metrics group for each TmpFileMgr so they don't get confused by + // the pre-existing metrics (TmpFileMgr assumes it's a singleton in product code). + MetricGroup* metrics = obj_pool_.Add(new MetricGroup("")); + TmpFileMgr* mgr = obj_pool_.Add(new TmpFileMgr()); + EXPECT_OK(mgr->InitCustom(tmp_dirs_spec, false, metrics)); + return mgr; + } + /// Check that metric values are consistent with TmpFileMgr state. void CheckMetrics(TmpFileMgr* tmp_file_mgr) { vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices(); @@ -122,6 +140,11 @@ class TmpFileMgrTest : public ::testing::Test { return Status::OK(); } + /// Helper to get the private tmp_dirs_ member. + static const vector<TmpFileMgr::TmpDir>& GetTmpDirs(TmpFileMgr* mgr) { + return mgr->tmp_dirs_; + } + /// Helper to call the private TmpFileMgr::NewFile() method. static void NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group, TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) { @@ -644,4 +667,184 @@ TEST_F(TmpFileMgrTest, TestHWMMetric) { file_group_2.Close(); checkHWMMetrics(0, 2 * LIMIT); } + +// Test that usage per directory is tracked correctly and per-directory limits are +// enforced. Sets up several scratch directories, some with limits, and checks +// that the allocations occur in the right directories. +TEST_F(TmpFileMgrTest, TestDirectoryLimits) { + vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2", + "/tmp/tmp-file-mgr-test.3"}); + vector<string> tmp_dir_specs({"/tmp/tmp-file-mgr-test.1:512", + "/tmp/tmp-file-mgr-test.2:1k", "/tmp/tmp-file-mgr-test.3"}); + RemoveAndCreateDirs(tmp_dirs); + TmpFileMgr tmp_file_mgr; + ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get())); + + TmpFileMgr::FileGroup file_group_1( + &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId()); + TmpFileMgr::FileGroup file_group_2( + &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId()); + + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group_1, &files)); + ASSERT_OK(CreateFiles(&file_group_2, &files)); + + IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>( + "tmp-file-mgr.scratch-space-bytes-used.dir-0"); + IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>( + "tmp-file-mgr.scratch-space-bytes-used.dir-1"); + IntGauge* dir3_usage = metrics_->FindMetricForTesting<IntGauge>( + "tmp-file-mgr.scratch-space-bytes-used.dir-2"); + + // A power-of-two so that FileGroup allocates exactly this amount of scratch space. + const int64_t ALLOC_SIZE = 512; + int64_t offset; + TmpFileMgr::File* alloc_file; + + // Allocate three times - once per directory. We expect these allocations to go through + // so we should have one allocation in each directory. + SetNextAllocationIndex(&file_group_1, 0); + for (int i = 0; i < tmp_dir_specs.size(); ++i) { + ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset)); + } + EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue()); + EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue()); + EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue()); + + // This time we should hit the limit on the first directory. Do this from a + // different file group to show that limits are enforced across file groups. + for (int i = 0; i < 2; ++i) { + ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset)); + } + EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue()); + EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue()); + EXPECT_EQ(2 * ALLOC_SIZE, dir3_usage->GetValue()); + + // Now we're at the limits on two directories, all allocations should got to the + // last directory without a limit. + for (int i = 0; i < 100; ++i) { + ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset)); + } + EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue()); + EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue()); + EXPECT_EQ(102 * ALLOC_SIZE, dir3_usage->GetValue()); + + file_group_2.Close(); + // Metrics should be decremented when the file groups delete the underlying files. + EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue()); + EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue()); + EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue()); + + // We should be able to reuse the space freed up. + ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset)); + + EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue()); + file_group_1.Close(); + EXPECT_EQ(0, dir1_usage->GetValue()); + EXPECT_EQ(0, dir2_usage->GetValue()); + EXPECT_EQ(0, dir3_usage->GetValue()); +} + +// Test the case when all per-directory limits are hit. We expect to return a status +// and fail gracefully. +TEST_F(TmpFileMgrTest, TestDirectoryLimitsExhausted) { + vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"}); + vector<string> tmp_dir_specs( + {"/tmp/tmp-file-mgr-test.1:256kb", "/tmp/tmp-file-mgr-test.2:1mb"}); + const int64_t DIR1_LIMIT = 256L * 1024L; + const int64_t DIR2_LIMIT = 1024L * 1024L; + RemoveAndCreateDirs(tmp_dirs); + TmpFileMgr tmp_file_mgr; + ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get())); + + TmpFileMgr::FileGroup file_group_1( + &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId()); + TmpFileMgr::FileGroup file_group_2( + &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId()); + + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group_1, &files)); + ASSERT_OK(CreateFiles(&file_group_2, &files)); + + IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>( + "tmp-file-mgr.scratch-space-bytes-used.dir-0"); + IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>( + "tmp-file-mgr.scratch-space-bytes-used.dir-1"); + + // A power-of-two so that FileGroup allocates exactly this amount of scratch space. + const int64_t ALLOC_SIZE = 512; + const int64_t MAX_ALLOCATIONS = (DIR1_LIMIT + DIR2_LIMIT) / ALLOC_SIZE; + int64_t offset; + TmpFileMgr::File* alloc_file; + + // Allocate exactly the maximum total capacity of the directories. + SetNextAllocationIndex(&file_group_1, 0); + for (int i = 0; i < MAX_ALLOCATIONS; ++i) { + ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset)); + } + EXPECT_EQ(DIR1_LIMIT, dir1_usage->GetValue()); + EXPECT_EQ(DIR2_LIMIT, dir2_usage->GetValue()); + // The directories are at capacity, so allocations should fail. + Status err1 = GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset); + ASSERT_EQ(err1.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED); + Status err2 = GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset); + ASSERT_EQ(err2.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED); + + // A FileGroup should recover once allocations are released, i.e. it does not + // permanently block allocating files from the group. + file_group_1.Close(); + ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset)); + file_group_2.Close(); +} + +// Test the directory parsing logic, including the various error cases. +TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) { + RemoveAndCreateDirs({"/tmp/tmp-file-mgr-test1", "/tmp/tmp-file-mgr-test2", + "/tmp/tmp-file-mgr-test3", "/tmp/tmp-file-mgr-test4", "/tmp/tmp-file-mgr-test5", + "/tmp/tmp-file-mgr-test6", "/tmp/tmp-file-mgr-test7"}); + // Configure various directories with valid formats. + auto& dirs = GetTmpDirs( + CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:5g,/tmp/tmp-file-mgr-test2," + "/tmp/tmp-file-mgr-test3:1234,/tmp/tmp-file-mgr-test4:99999999," + "/tmp/tmp-file-mgr-test5:200tb,/tmp/tmp-file-mgr-test6:100MB")); + EXPECT_EQ(6, dirs.size()); + EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit); + EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit); + EXPECT_EQ(1234, dirs[2].bytes_limit); + EXPECT_EQ(99999999, dirs[3].bytes_limit); + EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit); + EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit); + + // Various invalid limit formats result in the directory getting skipped. + // Include a valid dir on the end to ensure that we don't short-circuit all + // directories. + auto& dirs2 = GetTmpDirs( + CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:foo,/tmp/tmp-file-mgr-test2:?," + "/tmp/tmp-file-mgr-test3:1.2.3.4,/tmp/tmp-file-mgr-test4: ," + "/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%," + "/tmp/tmp-file-mgr-test1:100")); + EXPECT_EQ(1, dirs2.size()); + EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path); + EXPECT_EQ(100, dirs2[0].bytes_limit); + + // Various valid ways of specifying "unlimited". + auto& dirs3 = + GetTmpDirs(CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:,/tmp/tmp-file-mgr-test2:-1," + "/tmp/tmp-file-mgr-test3,/tmp/tmp-file-mgr-test4:0")); + EXPECT_EQ(4, dirs3.size()); + for (const auto& dir : dirs3) { + EXPECT_EQ(numeric_limits<int64_t>::max(), dir.bytes_limit); + } + + // Extra colons + auto& dirs4 = GetTmpDirs( + CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:1:,/tmp/tmp-file-mgr-test2:10mb::")); + EXPECT_EQ(0, dirs4.size()); + + // Empty strings. + auto& nodirs = GetTmpDirs(CreateTmpFileMgr("")); + EXPECT_EQ(0, nodirs.size()); + auto& empty_paths = GetTmpDirs(CreateTmpFileMgr(",")); + EXPECT_EQ(2, empty_paths.size()); } +} // namespace impala diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 400d4d6..7954a62 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -17,6 +17,8 @@ #include "runtime/tmp-file-mgr.h" +#include <limits> + #include <boost/algorithm/string.hpp> #include <boost/filesystem.hpp> #include <boost/lexical_cast.hpp> @@ -35,6 +37,7 @@ #include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" +#include "util/parse-util.h" #include "util/pretty-printer.h" #include "util/runtime-profile-counters.h" @@ -43,7 +46,14 @@ DEFINE_bool(disk_spill_encryption, true, "Set this to encrypt and perform an integrity " "check on all data spilled to disk during a query"); -DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories"); +DEFINE_string(scratch_dirs, "/tmp", + "Writable scratch directories. " + "This is a comma-separated list of directories. Each directory is " + "specified as the directory path and an optional limit on the bytes that will " + "be allocated in that directory. If the optional limit is provided, the path and " + "the limit are separated by a colon. E.g. '/dir1:10G,/dir2:5GB,/dir3' will allow " + "allocating up to 10GB of scratch in /dir1, 5GB of scratch in /dir2 and an " + "unlimited amount in /dir3."); DEFINE_bool(allow_multiple_scratch_dirs_per_device, true, "If false and --scratch_dirs contains multiple directories on the same device, " "then only the first writable directory is used"); @@ -71,6 +81,8 @@ const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK = "tmp-file-mgr.scratch-space-bytes-used-high-water-mark"; const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED = "tmp-file-mgr.scratch-space-bytes-used"; +const string SCRATCH_DIR_BYTES_USED_FORMAT = + "tmp-file-mgr.scratch-space-bytes-used.dir-$0"; TmpFileMgr::TmpFileMgr() : initialized_(false), @@ -79,27 +91,61 @@ TmpFileMgr::TmpFileMgr() scratch_bytes_used_metric_(nullptr) {} Status TmpFileMgr::Init(MetricGroup* metrics) { - string tmp_dirs_spec = FLAGS_scratch_dirs; + return InitCustom( + FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics); +} + +Status TmpFileMgr::InitCustom( + const string& tmp_dirs_spec, bool one_dir_per_device, MetricGroup* metrics) { vector<string> all_tmp_dirs; // Empty string should be interpreted as no scratch if (!tmp_dirs_spec.empty()) { split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on); } - return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics); + return InitCustom(all_tmp_dirs, one_dir_per_device, metrics); } -Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device, - MetricGroup* metrics) { +Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers, + bool one_dir_per_device, MetricGroup* metrics) { DCHECK(!initialized_); - if (tmp_dirs.empty()) { + if (tmp_dir_specifiers.empty()) { LOG(WARNING) << "Running without spill to disk: no scratch directories provided."; } + vector<TmpDir> tmp_dirs; + // Parse the directory specifiers. Don't return an error on parse errors, just log a + // warning - we don't want to abort process startup because of misconfigured scratch, + // since queries will generally still be runnable. + for (const string& tmp_dir_spec : tmp_dir_specifiers) { + vector<string> toks; + split(toks, tmp_dir_spec, is_any_of(":"), token_compress_on); + if (toks.size() > 2) { + LOG(ERROR) << "Could not parse temporary dir specifier, too many colons: '" + << tmp_dir_spec << "'"; + continue; + } + int64_t bytes_limit = numeric_limits<int64_t>::max(); + if (toks.size() == 2) { + bool is_percent; + bytes_limit = ParseUtil::ParseMemSpec(toks[1], &is_percent, 0); + if (bytes_limit < 0 || is_percent) { + LOG(ERROR) << "Malformed data cache capacity configuration '" << tmp_dir_spec + << "'"; + continue; + } else if (bytes_limit == 0) { + // Interpret -1, 0 or empty string as no limit. + bytes_limit = numeric_limits<int64_t>::max(); + } + } + IntGauge* bytes_used_metric = metrics->AddGauge( + SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs.size())); + tmp_dirs.emplace_back(toks[0], bytes_limit, bytes_used_metric); + } vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false); // For each tmp directory, find the disk it is on, // so additional tmp directories on the same disk can be skipped. for (int i = 0; i < tmp_dirs.size(); ++i) { - path tmp_path(trim_right_copy_if(tmp_dirs[i], is_any_of("/"))); + path tmp_path(trim_right_copy_if(tmp_dirs[i].path, is_any_of("/"))); tmp_path = absolute(tmp_path); path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME); // tmp_path must be a writable directory. @@ -127,8 +173,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d if (status.ok()) { if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true; LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on " - << "disk " << disk_id; - tmp_dirs_.push_back(scratch_subdir_path.string()); + << "disk " << disk_id + << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs[i].bytes_limit); + tmp_dirs_.emplace_back(scratch_subdir_path.string(), tmp_dirs[i].bytes_limit, + tmp_dirs[i].bytes_used_metric); } else { LOG(WARNING) << "Could not remove and recreate directory " << scratch_subdir_path.string() << ": cannot use it for scratch. " @@ -144,7 +192,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>()); num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size()); for (int i = 0; i < tmp_dirs_.size(); ++i) { - active_scratch_dirs_metric_->Add(tmp_dirs_[i]); + active_scratch_dirs_metric_->Add(tmp_dirs_[i].path); } scratch_bytes_used_metric_ = metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK, @@ -154,7 +202,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d if (tmp_dirs_.empty() && !tmp_dirs.empty()) { LOG(ERROR) << "Running without spill to disk: could not use any scratch " - << "directories in list: " << join(tmp_dirs, ",") + << "directories in list: " << join(tmp_dir_specifiers, ",") << ". See previous warnings for information on causes."; } return Status::OK(); @@ -170,7 +218,7 @@ void TmpFileMgr::NewFile( string unique_name = lexical_cast<string>(random_generator()()); stringstream file_name; file_name << PrintId(file_group->unique_id()) << "_" << unique_name; - path new_file_path(tmp_dirs_[device_id]); + path new_file_path(tmp_dirs_[device_id].path); new_file_path /= file_name.str(); new_file->reset(new File(file_group, device_id, new_file_path.string())); @@ -180,7 +228,7 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const { DCHECK(initialized_); DCHECK_GE(device_id, 0); DCHECK_LT(device_id, tmp_dirs_.size()); - return tmp_dirs_[device_id]; + return tmp_dirs_[device_id].path; } int TmpFileMgr::NumActiveTmpDevices() { @@ -206,10 +254,17 @@ TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string& DCHECK(file_group != nullptr); } -void TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) { +bool TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) { DCHECK_GT(num_bytes, 0); + TmpDir* dir = GetDir(); + // Increment optimistically and roll back if the limit is exceeded. + if (dir->bytes_used_metric->Increment(num_bytes) > dir->bytes_limit) { + dir->bytes_used_metric->Increment(-num_bytes); + return false; + } *offset = bytes_allocated_; bytes_allocated_ += num_bytes; + return true; } int TmpFileMgr::File::AssignDiskQueue() const { @@ -223,7 +278,13 @@ void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) { Status TmpFileMgr::File::Remove() { // Remove the file if present (it may not be present if no writes completed). - return FileSystemUtil::RemovePaths({path_}); + Status status = FileSystemUtil::RemovePaths({path_}); + GetDir()->bytes_used_metric->Increment(-bytes_allocated_); + return status; +} + +TmpFileMgr::TmpDir* TmpFileMgr::File::GetDir() { + return &file_group_->tmp_file_mgr_->tmp_dirs_[device_id_]; } string TmpFileMgr::File::DebugString() { @@ -272,7 +333,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() { ++files_allocated; } DCHECK_EQ(tmp_files_.size(), files_allocated); - if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus(); + if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus({}); // Start allocating on a random device to avoid overloading the first device. next_allocation_index_ = rand() % tmp_files_.size(); return Status::OK(); @@ -315,18 +376,27 @@ Status TmpFileMgr::FileGroup::AllocateSpace( // Lazily create the files on the first write. if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles()); + // Track the indices of any directories where we failed due to capacity. This is + // required for error reporting if we are totally out of capacity so that it's clear + // that some disks were at capacity. + vector<int> at_capacity_dirs; + // Find the next physical file in round-robin order and allocate a range from it. for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) { - *tmp_file = tmp_files_[next_allocation_index_].get(); + int idx = next_allocation_index_; next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size(); + *tmp_file = tmp_files_[idx].get(); if ((*tmp_file)->is_blacklisted()) continue; - (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset); + if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) { + at_capacity_dirs.push_back(idx); + continue; + } scratch_space_bytes_used_counter_->Add(scratch_range_bytes); tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes); current_bytes_allocated_ += num_bytes; return Status::OK(); } - return ScratchAllocationFailedStatus(); + return ScratchAllocationFailedStatus(at_capacity_dirs); } void TmpFileMgr::FileGroup::RecycleFileRange(unique_ptr<WriteHandle> handle) { @@ -484,11 +554,21 @@ Status TmpFileMgr::FileGroup::RecoverWriteError( return handle->RetryWrite(io_ctx_.get(), tmp_file, file_offset); } -Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus() { - Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, - join(tmp_file_mgr_->tmp_dirs_, ","), GetBackendString(), - PrettyPrinter::PrintBytes(scratch_space_bytes_used_counter_->value()), - PrettyPrinter::PrintBytes(current_bytes_allocated_)); +Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus( + const vector<int>& at_capacity_dirs) { + vector<string> tmp_dir_paths; + for (TmpDir& tmp_dir : tmp_file_mgr_->tmp_dirs_) { + tmp_dir_paths.push_back(tmp_dir.path); + } + vector<string> at_capacity_dir_paths; + for (int dir_idx : at_capacity_dirs) { + at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx].path); + } + Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, ","), + GetBackendString(), + PrettyPrinter::PrintBytes(scratch_space_bytes_used_counter_->value()), + PrettyPrinter::PrintBytes(current_bytes_allocated_), + join(at_capacity_dir_paths, ",")); // Include all previous errors that may have caused the failure. for (Status& err : scratch_errors_) status.MergeStatus(err); return status; diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 9a38e4f..70ec1d4 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -92,6 +92,22 @@ class TmpFileMgr { /// Same typedef as io::WriteRange::WriteDoneCallback. typedef std::function<void(const Status&)> WriteDoneCallback; + /// A configured temporary directory that TmpFileMgr allocates files in. + struct TmpDir { + TmpDir(const std::string& path, int64_t bytes_limit, IntGauge* bytes_used_metric) + : path(path), bytes_limit(bytes_limit), bytes_used_metric(bytes_used_metric) {} + + /// Path to the temporary directory. + const std::string path; + + /// Limit on bytes that should be written to this path. Set to maximum value + /// of int64_t if there is no limit. + int64_t const bytes_limit; + + /// The current bytes of scratch used for this temporary directory. + IntGauge* const bytes_used_metric; + }; + /// Represents a group of temporary files - one per disk with a scratch directory. The /// total allocated bytes of the group can be bound by setting the space allocation /// limit. The owner of the FileGroup object is responsible for calling the Close() @@ -202,8 +218,11 @@ class TmpFileMgr { /// Return a SCRATCH_ALLOCATION_FAILED error with the appropriate information, /// including scratch directories, the amount of scratch allocated and previous - /// errors that caused this failure. 'lock_' must be held by caller. - Status ScratchAllocationFailedStatus(); + /// errors that caused this failure. If some directories were at capacity, + /// but had not encountered an error, the indices of these directories in + /// tmp_file_mgr_->tmp_dir_ should be included in 'at_capacity_dirs'. + /// 'lock_' must be held by caller. + Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs); /// The TmpFileMgr it is associated with. TmpFileMgr* const tmp_file_mgr_; @@ -390,9 +409,13 @@ class TmpFileMgr { /// Custom initialization - initializes with the provided list of directories. /// If one_dir_per_device is true, only use one temporary directory per device. - /// This interface is intended for testing purposes. - Status InitCustom(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device, + /// This interface is intended for testing purposes. 'tmp_dir_specifiers' + /// use the command-line syntax, i.e. <path>[:<limit>]. The first variant takes + /// a comma-separated list, the second takes a vector. + Status InitCustom(const std::string& tmp_dirs_spec, bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT; + Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers, + bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT; /// Return the scratch directory path for the device. std::string GetTmpDirPath(DeviceId device_id) const; @@ -419,7 +442,7 @@ class TmpFileMgr { bool initialized_; /// The paths of the created tmp directories. - std::vector<std::string> tmp_dirs_; + std::vector<TmpDir> tmp_dirs_; /// Metrics to track active scratch directories. IntGauge* num_active_scratch_dirs_metric_; diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index 8c02417..79f50cc 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -82,7 +82,8 @@ auto MakeTestOkFn(TQueryOptions& options, OptionDef<T> option_def) { template<typename T> auto MakeTestErrFn(TQueryOptions& options, OptionDef<T> option_def) { return [&options, option_def](const char* str) { - EXPECT_FALSE(SetQueryOption(option_def.option_name, str, &options, nullptr).ok()); + EXPECT_FALSE(SetQueryOption(option_def.option_name, str, &options, nullptr).ok()) + << option_def.option_name << " " << str; }; } @@ -110,7 +111,7 @@ void TestByteCaseSet(TQueryOptions& options, } TestError(to_string(range.lower_bound - 1).c_str()); TestError(to_string(static_cast<uint64_t>(range.upper_bound) + 1).c_str()); - TestError("1tb"); + TestError("1pb"); TestError("1%"); TestError("1%B"); TestError("1B%"); @@ -120,6 +121,7 @@ void TestByteCaseSet(TQueryOptions& options, {"1 B", 1}, {"0Kb", 0}, {"4G", 4ll * 1024 * 1024 * 1024}, + {"4tb", 4ll * 1024 * 1024 * 1024 * 1024}, {"-1M", -1024 * 1024} }; for (const auto& value_def : common_values) { @@ -307,7 +309,7 @@ TEST(QueryOptions, SetSpecialOptions) { TestOk("0", 0); TestOk("4GB", 4ll * 1024 * 1024 * 1024); TestError("-1MB"); - TestError("1tb"); + TestError("1pb"); TestError("1%"); TestError("1%B"); TestError("1B%"); diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc index 6b8e7a5..c8fb4a0 100644 --- a/be/src/util/parse-util-test.cc +++ b/be/src/util/parse-util-test.cc @@ -35,6 +35,7 @@ TEST(ParseMemSpecs, Basic) { int64_t kilobytes = 1024; int64_t megabytes = 1024 * kilobytes; int64_t gigabytes = 1024 * megabytes; + int64_t terabytes = 1024 * gigabytes; bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem()); ASSERT_EQ(1, bytes); @@ -72,6 +73,14 @@ TEST(ParseMemSpecs, Basic) { ASSERT_EQ(12 * gigabytes, bytes); ASSERT_FALSE(is_percent); + bytes = ParseUtil::ParseMemSpec("8T", &is_percent, MemInfo::physical_mem()); + ASSERT_EQ(8 * terabytes, bytes); + ASSERT_FALSE(is_percent); + + bytes = ParseUtil::ParseMemSpec("12tb", &is_percent, MemInfo::physical_mem()); + ASSERT_EQ(12 * terabytes, bytes); + ASSERT_FALSE(is_percent); + bytes = ParseUtil::ParseMemSpec("13%", &is_percent, MemInfo::physical_mem()); ASSERT_GT(bytes, 0); ASSERT_TRUE(is_percent); @@ -91,6 +100,8 @@ TEST(ParseMemSpecs, Basic) { bad_values.push_back("1Bb"); bad_values.push_back("1%%"); bad_values.push_back("1.1"); + bad_values.push_back("1pb"); + bad_values.push_back("1eb"); stringstream ss; ss << UINT64_MAX; bad_values.push_back(ss.str()); diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc index d17cf3e..75253d7 100644 --- a/be/src/util/parse-util.cc +++ b/be/src/util/parse-util.cc @@ -40,6 +40,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent, number_str_len--; } switch (*suffix_char) { + case 't': + case 'T': + // Terabytes. + number_str_len--; + multiplier = 1024L * 1024L * 1024L * 1024L; + break; case 'g': case 'G': // Gigabytes. diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 5e1070d..bb45c3d 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -308,7 +308,8 @@ error_codes = ( ("SCRATCH_ALLOCATION_FAILED", 101, "Could not create files in any configured scratch " "directories (--scratch_dirs=$0) on backend '$1'. $2 of scratch is currently in " "use by this Impala Daemon ($3 by this query). See logs for previous errors that " - "may have prevented creating or writing scratch files."), + "may have prevented creating or writing scratch files. The following directories " + "were at capacity: $4"), ("SCRATCH_READ_TRUNCATED", 102, "Error reading $0 bytes from scratch file '$1' " "on backend $2 at offset $3: could only read $4 bytes"), diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index bd30674..1ce9264 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -2151,6 +2151,16 @@ "key": "tmp-file-mgr.scratch-space-bytes-used-high-water-mark" }, { + "description": "The current total spilled bytes for a single scratch directory.", + "contexts": [ + "IMPALAD" + ], + "label": "Per-directory scratch space bytes used", + "units": "BYTES", + "kind": "GAUGE", + "key": "tmp-file-mgr.scratch-space-bytes-used.dir-$0" + }, + { "description": "Number of senders waiting for receiving fragment to initialize", "contexts": [ "IMPALAD" diff --git a/docs/topics/impala_mem_limit.xml b/docs/topics/impala_mem_limit.xml index 2bd89e5..d61edf3 100644 --- a/docs/topics/impala_mem_limit.xml +++ b/docs/topics/impala_mem_limit.xml @@ -167,10 +167,10 @@ MEM_LIMIT set to 3mb </p> <codeblock rev=""> -[localhost:21000] > set mem_limit=3tb; -MEM_LIMIT set to 3tb +[localhost:21000] > set mem_limit=3pb; +MEM_LIMIT set to 3pb [localhost:21000] > select 5; -ERROR: Failed to parse query memory limit from '3tb'. +ERROR: Failed to parse query memory limit from '3pb'. [localhost:21000] > set mem_limit=xyz; MEM_LIMIT set to xyz