Repository: incubator-impala Updated Branches: refs/heads/master 6d15f0377 -> 4ce5213d1
IMPALA-3202: variable-length scratch file ranges This uses a simple approach where scratch ranges are managed in power-of-two size classes and we don't attempt to coalesce or split the ranges to move them into different size classes. Thus this does not optimally reuse space if a query spills a variety of page sizes, but improving this may not be worth the added complexity. Testing: Extended tmp-file-mgr-test to exercise the variable scratch range support. We will get system test coverage once this is used by the new buffer pool. Change-Id: Ic0ad84493c2c93a5602c404a83c718f25ea25575 Reviewed-on: http://gerrit.cloudera.org:8080/5597 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fac000d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fac000d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fac000d3 Branch: refs/heads/master Commit: fac000d311db365f6c770de4e5205a506d209a5b Parents: 6d15f03 Author: Tim Armstrong <[email protected]> Authored: Tue Dec 20 15:48:44 2016 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jan 5 19:22:49 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/buffered-block-mgr.cc | 2 +- be/src/runtime/tmp-file-mgr-test.cc | 93 +++++++++++++++---------------- be/src/runtime/tmp-file-mgr.cc | 61 ++++++++++---------- be/src/runtime/tmp-file-mgr.h | 37 ++++++------ 4 files changed, 96 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index d4e14a2..199807b 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -1228,7 +1228,7 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, parent_profile->AddChild(profile_.get()); tmp_file_group_.reset(new TmpFileMgr::FileGroup( - tmp_file_mgr, io_mgr, profile_.get(), query_id_, max_block_size_, scratch_limit)); + tmp_file_mgr, io_mgr, profile_.get(), query_id_, scratch_limit)); mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", TUnit::BYTES); mem_limit_counter_->Set(mem_limit); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 61fd682..fe384b8 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -203,8 +203,7 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) { TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get()); TUniqueId id; - TmpFileMgr::FileGroup file_group( - &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); // Only the first directory should be used. EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices()); @@ -228,8 +227,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) { TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); TUniqueId id; - TmpFileMgr::FileGroup file_group( - &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); // Both directories should be used. EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices()); @@ -257,8 +255,7 @@ TEST_F(TmpFileMgrTest, TestReportError) { TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); TUniqueId id; - TmpFileMgr::FileGroup file_group( - &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); // Both directories should be used. vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices(); @@ -307,8 +304,7 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) { TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); TUniqueId id; - TmpFileMgr::FileGroup file_group( - &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id); vector<TmpFileMgr::File*> allocated_files; ASSERT_OK(CreateFiles(&file_group, &allocated_files)) @@ -334,11 +330,11 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) { TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - const int64_t LIMIT = 100; - const int64_t ALLOC_SIZE = 50; + const int64_t LIMIT = 128; + // A power-of-two so that FileGroup allocates exactly this amount of scratch space. + const int64_t ALLOC_SIZE = 64; TUniqueId id; - TmpFileMgr::FileGroup file_group( - &tmp_file_mgr, io_mgr(), profile_, id, ALLOC_SIZE, LIMIT); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id, LIMIT); vector<TmpFileMgr::File*> files; ASSERT_OK(CreateFiles(&file_group, &files)); @@ -367,46 +363,49 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) { file_group.Close(); } -// Test that scratch file ranges are recycled as expected. +// Test that scratch file ranges of varying length are recycled as expected. TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) { - const int64_t ALLOC_SIZE = 50; TUniqueId id; - TmpFileMgr::FileGroup file_group( - test_env_->tmp_file_mgr(), io_mgr(), profile_, id, ALLOC_SIZE); - - // Generate some data. - const int BLOCKS = 5; - vector<vector<uint8_t>> data(BLOCKS); - for (int i = 0; i < BLOCKS; ++i) { - data[i].resize(ALLOC_SIZE); - std::iota(data[i].begin(), data[i].end(), i); - } - - DiskIoMgr::WriteRange::WriteDoneCallback callback = - bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); - vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS); - // Make sure free space doesn't grow over several iterations. - const int TEST_ITERS = 5; - for (int i = 0; i < TEST_ITERS; ++i) { - cb_counter_ = 0; - for (int j = 0; j < BLOCKS; ++j) { - ASSERT_OK( - file_group.Write(MemRange(data[j].data(), ALLOC_SIZE), callback, &handles[j])); + TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), profile_, id); + int64_t expected_scratch_bytes_allocated = 0; + // Test some different allocation sizes. + for (int alloc_size = 64; alloc_size <= 64 * 1024; alloc_size *= 2) { + // Generate some data. + const int BLOCKS = 5; + vector<vector<uint8_t>> data(BLOCKS); + for (int i = 0; i < BLOCKS; ++i) { + data[i].resize(alloc_size); + std::iota(data[i].begin(), data[i].end(), i); } - WaitForCallbacks(BLOCKS); - EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group)); - - // Read back and validate. - for (int j = 0; j < BLOCKS; ++j) { - uint8_t tmp[ALLOC_SIZE]; - ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp, ALLOC_SIZE))); - EXPECT_EQ(0, memcmp(tmp, data[j].data(), ALLOC_SIZE)); - file_group.DestroyWriteHandle(move(handles[j])); + + DiskIoMgr::WriteRange::WriteDoneCallback callback = + bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); + vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS); + // 'file_group' should allocate extra scratch bytes for this 'alloc_size'. + expected_scratch_bytes_allocated += alloc_size * BLOCKS; + const int TEST_ITERS = 5; + // Make sure free space doesn't grow over several iterations. + for (int i = 0; i < TEST_ITERS; ++i) { + cb_counter_ = 0; + for (int j = 0; j < BLOCKS; ++j) { + ASSERT_OK(file_group.Write( + MemRange(data[j].data(), alloc_size), callback, &handles[j])); + } + WaitForCallbacks(BLOCKS); + EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group)); + + // Read back and validate. + for (int j = 0; j < BLOCKS; ++j) { + vector<uint8_t> tmp(alloc_size); + ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp.data(), alloc_size))); + EXPECT_EQ(0, memcmp(tmp.data(), data[j].data(), alloc_size)); + file_group.DestroyWriteHandle(move(handles[j])); + } + // Check that the space is still in use - it should be recycled by the next + // iteration. + EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group)); } - // Check that the space is still in use - it should be recycled by the next iteration. - EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group)); } - file_group.Close(); test_env_->TearDownRuntimeStates(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index bf2b7ec..d6f0010 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -26,8 +26,10 @@ #include <gutil/strings/join.h> #include <gutil/strings/substitute.h> +#include "gutil/bits.h" #include "runtime/runtime-state.h" #include "runtime/tmp-file-mgr-internal.h" +#include "util/bit-util.h" #include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" @@ -222,13 +224,11 @@ string TmpFileMgr::File::DebugString() { } TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, - RuntimeProfile* profile, const TUniqueId& unique_id, int64_t block_size, - int64_t bytes_limit) + RuntimeProfile* profile, const TUniqueId& unique_id, int64_t bytes_limit) : tmp_file_mgr_(tmp_file_mgr), io_mgr_(io_mgr), io_ctx_(nullptr), unique_id_(unique_id), - block_size_(block_size), bytes_limit_(bytes_limit), write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)), bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)), @@ -239,8 +239,8 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")), encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")), current_bytes_allocated_(0), - next_allocation_index_(0) { - DCHECK_GT(block_size_, 0); + next_allocation_index_(0), + free_ranges_(64) { DCHECK(tmp_file_mgr != nullptr); io_mgr_->RegisterContext(&io_ctx_, nullptr); } @@ -301,17 +301,18 @@ void TmpFileMgr::FileGroup::Close() { Status TmpFileMgr::FileGroup::AllocateSpace( int64_t num_bytes, File** tmp_file, int64_t* file_offset) { - DCHECK_LE(num_bytes, block_size_); lock_guard<SpinLock> lock(lock_); - - if (!free_ranges_.empty()) { - *tmp_file = free_ranges_.back().first; - *file_offset = free_ranges_.back().second; - free_ranges_.pop_back(); + int64_t scratch_range_bytes = max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(num_bytes)); + int free_ranges_idx = Bits::Log2Ceiling64(scratch_range_bytes); + if (!free_ranges_[free_ranges_idx].empty()) { + *tmp_file = free_ranges_[free_ranges_idx].back().first; + *file_offset = free_ranges_[free_ranges_idx].back().second; + free_ranges_[free_ranges_idx].pop_back(); return Status::OK(); } - if (bytes_limit_ != -1 && current_bytes_allocated_ + block_size_ > bytes_limit_) { + if (bytes_limit_ != -1 + && current_bytes_allocated_ + scratch_range_bytes > bytes_limit_) { return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_); } @@ -323,9 +324,9 @@ Status TmpFileMgr::FileGroup::AllocateSpace( *tmp_file = tmp_files_[next_allocation_index_].get(); next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size(); if ((*tmp_file)->is_blacklisted()) continue; - Status status = (*tmp_file)->AllocateSpace(block_size_, file_offset); + Status status = (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset); if (status.ok()) { - scratch_space_bytes_used_counter_->Add(block_size_); + scratch_space_bytes_used_counter_->Add(scratch_range_bytes); current_bytes_allocated_ += num_bytes; return Status::OK(); } @@ -347,9 +348,13 @@ Status TmpFileMgr::FileGroup::AllocateSpace( return err_status; } -void TmpFileMgr::FileGroup::AddFreeRange(File* file, int64_t offset) { +void TmpFileMgr::FileGroup::RecycleFileRange(unique_ptr<WriteHandle> handle) { + int64_t scratch_range_bytes = + max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(handle->len())); + int free_ranges_idx = Bits::Log2Ceiling64(scratch_range_bytes); lock_guard<SpinLock> lock(lock_); - free_ranges_.emplace_back(file, offset); + free_ranges_[free_ranges_idx].emplace_back( + handle->file_, handle->write_range_->offset()); } Status TmpFileMgr::FileGroup::Write( @@ -421,16 +426,14 @@ Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData( status = handle->CheckHashAndDecrypt(buffer); } handle->WaitForWrite(); - AddFreeRange(handle->file_, handle->write_range_->offset()); - handle.reset(); + RecycleFileRange(move(handle)); return status; } void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) { handle->Cancel(); handle->WaitForWrite(); - AddFreeRange(handle->file_, handle->write_range_->offset()); - handle.reset(); + RecycleFileRange(move(handle)); } void TmpFileMgr::FileGroup::WriteComplete( @@ -477,17 +480,15 @@ Status TmpFileMgr::FileGroup::RecoverWriteError( string TmpFileMgr::FileGroup::DebugString() { lock_guard<SpinLock> lock(lock_); stringstream ss; - ss << "FileGroup " << this << " block size " << block_size_ - << " bytes limit " << bytes_limit_ + ss << "FileGroup " << this << " bytes limit " << bytes_limit_ << " current bytes allocated " << current_bytes_allocated_ - << " next allocation index " << next_allocation_index_ - << " writes " << write_counter_->value() - << " bytes written " << bytes_written_counter_->value() - << " reads " << read_counter_->value() - << " bytes read " << bytes_read_counter_->value() - << " scratch bytes used " << scratch_space_bytes_used_counter_ - << " dist read timer " << disk_read_timer_->value() - << " encryption timer " << encryption_timer_->value() << endl + << " next allocation index " << next_allocation_index_ << " writes " + << write_counter_->value() << " bytes written " << bytes_written_counter_->value() + << " reads " << read_counter_->value() << " bytes read " + << bytes_read_counter_->value() << " scratch bytes used " + << scratch_space_bytes_used_counter_ << " dist read timer " + << disk_read_timer_->value() << " encryption timer " << encryption_timer_->value() + << endl << " " << tmp_files_.size() << " files:" << endl; for (unique_ptr<File>& file : tmp_files_) { ss << " " << file->DebugString() << endl; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 0c3e974..a8d63b2 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -55,18 +55,22 @@ namespace impala { /// Each WriteHandle is backed by a range of data in a scratch file. The first call to /// Write() will create files for the FileGroup with unique filenames on the configured /// temporary devices. At most one directory per device is used (unless overridden for -/// testing). Free space is managed within a FileGroup: once a WriteHandle is destroyed, -/// the file range backing it can be recycled for a different WriteHandle. The file range -/// of a WriteHandle can be replaced with a different one if a write error is encountered -/// and the data instead needs to be written to a different disk. +/// testing). The file range of a WriteHandle can be replaced with a different one if +/// a write error is encountered and the data instead needs to be written to a different +/// disk. +/// +/// Free Space Management: +/// Free space is managed within a FileGroup: once a WriteHandle is destroyed, the file +/// range backing it can be recycled for a different WriteHandle. Scratch file ranges +/// are grouped into size classes, each for a power-of-two number of bytes. Free file +/// ranges of each size class are managed separately (i.e. there is no splitting or +/// coalescing of ranges). /// /// Resource Management: /// TmpFileMgr provides some basic support for managing local disk space consumption. /// A FileGroup can be created with a limit on the total number of bytes allocated across /// all files. Writes that would exceed the limit fail with an error status. /// -/// TODO: each FileGroup can manage only fixed length scratch file ranges of 'block_size', -/// to simplify the recycling logic. BufferPool will require variable length ranges. /// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to /// temporarily blacklist devices that show I/O errors. class TmpFileMgr { @@ -94,10 +98,9 @@ class TmpFileMgr { /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file /// names. It is an error to create multiple FileGroups with the same 'unique_id'. - /// 'block_size' is the size of blocks in bytes that space will be allocated in. /// 'bytes_limit' is the limit on the total file space to allocate. FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile, - const TUniqueId& unique_id, int64_t block_size, int64_t bytes_limit = -1); + const TUniqueId& unique_id, int64_t bytes_limit = -1); ~FileGroup(); @@ -109,9 +112,6 @@ class TmpFileMgr { /// compression). The caller should not modify the data in 'buffer' until the write /// completes or is cancelled, otherwise invalid data may be written to disk. /// - /// TODO: buffer->len must be <= 'block_size' until FileGroup supports allocating - /// variable-length scratch files ranges. - /// /// Returns an error if the scratch space cannot be allocated or the write cannot /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from /// a different thread when the write completes successfully or unsuccessfully or is @@ -160,8 +160,9 @@ class TmpFileMgr { /// limit is exceeded. Must be called without 'lock_' held. Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset); - /// Add a free scratch range to 'free_ranges_'. Must be called without 'lock_' held. - void AddFreeRange(File* file, int64_t offset); + /// Add the scratch range from 'handle' to 'free_ranges_' and destroy handle. Must be + /// called without 'lock_' held. + void RecycleFileRange(std::unique_ptr<WriteHandle> handle); /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt /// to retry the write. On success or if the write can't be retried, calls @@ -193,10 +194,6 @@ class TmpFileMgr { /// Unique across all FileGroups. Used to prefix file names. const TUniqueId unique_id_; - /// Size of the blocks in bytes that scratch space is managed in. - /// TODO: support variable-length scratch file ranges. - const int64_t block_size_; - /// Max write space allowed (-1 means no limit). const int64_t bytes_limit_; @@ -235,8 +232,10 @@ class TmpFileMgr { /// files. int next_allocation_index_; - /// List of File/offset pairs for free scratch ranges of size 'block_size_' bytes. - std::vector<std::pair<File*, int64_t>> free_ranges_; + /// Each vector in free_ranges_[i] is a vector of File/offset pairs for free scratch + /// ranges of length 2^i bytes. Has 64 entries so that every int64_t length has a + /// valid list associated with it. + std::vector<std::vector<std::pair<File*, int64_t>>> free_ranges_; /// Errors encountered when creating/writing scratch files. We store the history so /// that we can report the original cause of the scratch errors if we run out of
