Repository: incubator-impala
Updated Branches:
  refs/heads/master 6d15f0377 -> 4ce5213d1


IMPALA-3202: variable-length scratch file ranges

This uses a simple approach where scratch ranges are managed in
power-of-two size classes and we don't attempt to coalesce or
split the ranges to move them into different size classes. Thus
this does not optimally reuse space if a query spills a variety
of page sizes, but improving this may not be worth the added
complexity.

Testing:
Extended tmp-file-mgr-test to exercise the variable scratch range
support. We will get system test coverage once this is used by the
new buffer pool.

Change-Id: Ic0ad84493c2c93a5602c404a83c718f25ea25575
Reviewed-on: http://gerrit.cloudera.org:8080/5597
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fac000d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fac000d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fac000d3

Branch: refs/heads/master
Commit: fac000d311db365f6c770de4e5205a506d209a5b
Parents: 6d15f03
Author: Tim Armstrong <[email protected]>
Authored: Tue Dec 20 15:48:44 2016 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jan 5 19:22:49 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr.cc |  2 +-
 be/src/runtime/tmp-file-mgr-test.cc  | 93 +++++++++++++++----------------
 be/src/runtime/tmp-file-mgr.cc       | 61 ++++++++++----------
 be/src/runtime/tmp-file-mgr.h        | 37 ++++++------
 4 files changed, 96 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc 
b/be/src/runtime/buffered-block-mgr.cc
index d4e14a2..199807b 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -1228,7 +1228,7 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
TmpFileMgr* tmp_file_mgr,
   parent_profile->AddChild(profile_.get());
 
   tmp_file_group_.reset(new TmpFileMgr::FileGroup(
-      tmp_file_mgr, io_mgr, profile_.get(), query_id_, max_block_size_, 
scratch_limit));
+      tmp_file_mgr, io_mgr, profile_.get(), query_id_, scratch_limit));
 
   mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", 
TUnit::BYTES);
   mem_limit_counter_->Set(mem_limit);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 61fd682..fe384b8 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -203,8 +203,7 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get());
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(
-      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
   // Only the first directory should be used.
   EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
@@ -228,8 +227,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(
-      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
   // Both directories should be used.
   EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
@@ -257,8 +255,7 @@ TEST_F(TmpFileMgrTest, TestReportError) {
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(
-      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
   // Both directories should be used.
   vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
@@ -307,8 +304,7 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(
-      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
   vector<TmpFileMgr::File*> allocated_files;
   ASSERT_OK(CreateFiles(&file_group, &allocated_files))
@@ -334,11 +330,11 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
 
-  const int64_t LIMIT = 100;
-  const int64_t ALLOC_SIZE = 50;
+  const int64_t LIMIT = 128;
+  // A power-of-two so that FileGroup allocates exactly this amount of scratch 
space.
+  const int64_t ALLOC_SIZE = 64;
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(
-      &tmp_file_mgr, io_mgr(), profile_, id, ALLOC_SIZE, LIMIT);
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id, 
LIMIT);
 
   vector<TmpFileMgr::File*> files;
   ASSERT_OK(CreateFiles(&file_group, &files));
@@ -367,46 +363,49 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
   file_group.Close();
 }
 
-// Test that scratch file ranges are recycled as expected.
+// Test that scratch file ranges of varying length are recycled as expected.
 TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
-  const int64_t ALLOC_SIZE = 50;
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(
-      test_env_->tmp_file_mgr(), io_mgr(), profile_, id, ALLOC_SIZE);
-
-  // Generate some data.
-  const int BLOCKS = 5;
-  vector<vector<uint8_t>> data(BLOCKS);
-  for (int i = 0; i < BLOCKS; ++i) {
-    data[i].resize(ALLOC_SIZE);
-    std::iota(data[i].begin(), data[i].end(), i);
-  }
-
-  DiskIoMgr::WriteRange::WriteDoneCallback callback =
-      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
-  vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS);
-  // Make sure free space doesn't grow over several iterations.
-  const int TEST_ITERS = 5;
-  for (int i = 0; i < TEST_ITERS; ++i) {
-    cb_counter_ = 0;
-    for (int j = 0; j < BLOCKS; ++j) {
-      ASSERT_OK(
-          file_group.Write(MemRange(data[j].data(), ALLOC_SIZE), callback, 
&handles[j]));
+  TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), 
profile_, id);
+  int64_t expected_scratch_bytes_allocated = 0;
+  // Test some different allocation sizes.
+  for (int alloc_size = 64; alloc_size <= 64 * 1024; alloc_size *= 2) {
+    // Generate some data.
+    const int BLOCKS = 5;
+    vector<vector<uint8_t>> data(BLOCKS);
+    for (int i = 0; i < BLOCKS; ++i) {
+      data[i].resize(alloc_size);
+      std::iota(data[i].begin(), data[i].end(), i);
     }
-    WaitForCallbacks(BLOCKS);
-    EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group));
-
-    // Read back and validate.
-    for (int j = 0; j < BLOCKS; ++j) {
-      uint8_t tmp[ALLOC_SIZE];
-      ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp, ALLOC_SIZE)));
-      EXPECT_EQ(0, memcmp(tmp, data[j].data(), ALLOC_SIZE));
-      file_group.DestroyWriteHandle(move(handles[j]));
+
+    DiskIoMgr::WriteRange::WriteDoneCallback callback =
+        bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+    vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS);
+    // 'file_group' should allocate extra scratch bytes for this 'alloc_size'.
+    expected_scratch_bytes_allocated += alloc_size * BLOCKS;
+    const int TEST_ITERS = 5;
+    // Make sure free space doesn't grow over several iterations.
+    for (int i = 0; i < TEST_ITERS; ++i) {
+      cb_counter_ = 0;
+      for (int j = 0; j < BLOCKS; ++j) {
+        ASSERT_OK(file_group.Write(
+            MemRange(data[j].data(), alloc_size), callback, &handles[j]));
+      }
+      WaitForCallbacks(BLOCKS);
+      EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
+
+      // Read back and validate.
+      for (int j = 0; j < BLOCKS; ++j) {
+        vector<uint8_t> tmp(alloc_size);
+        ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp.data(), 
alloc_size)));
+        EXPECT_EQ(0, memcmp(tmp.data(), data[j].data(), alloc_size));
+        file_group.DestroyWriteHandle(move(handles[j]));
+      }
+      // Check that the space is still in use - it should be recycled by the 
next
+      // iteration.
+      EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
     }
-    // Check that the space is still in use - it should be recycled by the 
next iteration.
-    EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group));
   }
-
   file_group.Close();
   test_env_->TearDownRuntimeStates();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index bf2b7ec..d6f0010 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -26,8 +26,10 @@
 #include <gutil/strings/join.h>
 #include <gutil/strings/substitute.h>
 
+#include "gutil/bits.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr-internal.h"
+#include "util/bit-util.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
@@ -222,13 +224,11 @@ string TmpFileMgr::File::DebugString() {
 }
 
 TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
-    RuntimeProfile* profile, const TUniqueId& unique_id, int64_t block_size,
-    int64_t bytes_limit)
+    RuntimeProfile* profile, const TUniqueId& unique_id, int64_t bytes_limit)
   : tmp_file_mgr_(tmp_file_mgr),
     io_mgr_(io_mgr),
     io_ctx_(nullptr),
     unique_id_(unique_id),
-    block_size_(block_size),
     bytes_limit_(bytes_limit),
     write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)),
     bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", 
TUnit::BYTES)),
@@ -239,8 +239,8 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, 
DiskIoMgr* io_mgr,
     disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
     encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")),
     current_bytes_allocated_(0),
-    next_allocation_index_(0) {
-  DCHECK_GT(block_size_, 0);
+    next_allocation_index_(0),
+    free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
   io_mgr_->RegisterContext(&io_ctx_, nullptr);
 }
@@ -301,17 +301,18 @@ void TmpFileMgr::FileGroup::Close() {
 
 Status TmpFileMgr::FileGroup::AllocateSpace(
     int64_t num_bytes, File** tmp_file, int64_t* file_offset) {
-  DCHECK_LE(num_bytes, block_size_);
   lock_guard<SpinLock> lock(lock_);
-
-  if (!free_ranges_.empty()) {
-    *tmp_file = free_ranges_.back().first;
-    *file_offset = free_ranges_.back().second;
-    free_ranges_.pop_back();
+  int64_t scratch_range_bytes = max<int64_t>(1L, 
BitUtil::RoundUpToPowerOfTwo(num_bytes));
+  int free_ranges_idx = Bits::Log2Ceiling64(scratch_range_bytes);
+  if (!free_ranges_[free_ranges_idx].empty()) {
+    *tmp_file = free_ranges_[free_ranges_idx].back().first;
+    *file_offset = free_ranges_[free_ranges_idx].back().second;
+    free_ranges_[free_ranges_idx].pop_back();
     return Status::OK();
   }
 
-  if (bytes_limit_ != -1 && current_bytes_allocated_ + block_size_ > 
bytes_limit_) {
+  if (bytes_limit_ != -1
+      && current_bytes_allocated_ + scratch_range_bytes > bytes_limit_) {
     return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_);
   }
 
@@ -323,9 +324,9 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
     *tmp_file = tmp_files_[next_allocation_index_].get();
     next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
     if ((*tmp_file)->is_blacklisted()) continue;
-    Status status = (*tmp_file)->AllocateSpace(block_size_, file_offset);
+    Status status = (*tmp_file)->AllocateSpace(scratch_range_bytes, 
file_offset);
     if (status.ok()) {
-      scratch_space_bytes_used_counter_->Add(block_size_);
+      scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
       current_bytes_allocated_ += num_bytes;
       return Status::OK();
     }
@@ -347,9 +348,13 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
   return err_status;
 }
 
-void TmpFileMgr::FileGroup::AddFreeRange(File* file, int64_t offset) {
+void TmpFileMgr::FileGroup::RecycleFileRange(unique_ptr<WriteHandle> handle) {
+  int64_t scratch_range_bytes =
+      max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(handle->len()));
+  int free_ranges_idx = Bits::Log2Ceiling64(scratch_range_bytes);
   lock_guard<SpinLock> lock(lock_);
-  free_ranges_.emplace_back(file, offset);
+  free_ranges_[free_ranges_idx].emplace_back(
+      handle->file_, handle->write_range_->offset());
 }
 
 Status TmpFileMgr::FileGroup::Write(
@@ -421,16 +426,14 @@ Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData(
     status = handle->CheckHashAndDecrypt(buffer);
   }
   handle->WaitForWrite();
-  AddFreeRange(handle->file_, handle->write_range_->offset());
-  handle.reset();
+  RecycleFileRange(move(handle));
   return status;
 }
 
 void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) 
{
   handle->Cancel();
   handle->WaitForWrite();
-  AddFreeRange(handle->file_, handle->write_range_->offset());
-  handle.reset();
+  RecycleFileRange(move(handle));
 }
 
 void TmpFileMgr::FileGroup::WriteComplete(
@@ -477,17 +480,15 @@ Status TmpFileMgr::FileGroup::RecoverWriteError(
 string TmpFileMgr::FileGroup::DebugString() {
   lock_guard<SpinLock> lock(lock_);
   stringstream ss;
-  ss << "FileGroup " << this << " block size " << block_size_
-     << " bytes limit " << bytes_limit_
+  ss << "FileGroup " << this << " bytes limit " << bytes_limit_
      << " current bytes allocated " << current_bytes_allocated_
-     << " next allocation index " << next_allocation_index_
-     << " writes " << write_counter_->value()
-     << " bytes written " << bytes_written_counter_->value()
-     << " reads " << read_counter_->value()
-     << " bytes read " << bytes_read_counter_->value()
-     << " scratch bytes used " << scratch_space_bytes_used_counter_
-     << " dist read timer " << disk_read_timer_->value()
-     << " encryption timer " << encryption_timer_->value() << endl
+     << " next allocation index " << next_allocation_index_ << " writes "
+     << write_counter_->value() << " bytes written " << 
bytes_written_counter_->value()
+     << " reads " << read_counter_->value() << " bytes read "
+     << bytes_read_counter_->value() << " scratch bytes used "
+     << scratch_space_bytes_used_counter_ << " dist read timer "
+     << disk_read_timer_->value() << " encryption timer " << 
encryption_timer_->value()
+     << endl
      << "  " << tmp_files_.size() << " files:" << endl;
   for (unique_ptr<File>& file : tmp_files_) {
     ss << "    " << file->DebugString() << endl;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fac000d3/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 0c3e974..a8d63b2 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -55,18 +55,22 @@ namespace impala {
 /// Each WriteHandle is backed by a range of data in a scratch file. The first 
call to
 /// Write() will create files for the FileGroup with unique filenames on the 
configured
 /// temporary devices. At most one directory per device is used (unless 
overridden for
-/// testing). Free space is managed within a FileGroup: once a WriteHandle is 
destroyed,
-/// the file range backing it can be recycled for a different WriteHandle. The 
file range
-/// of a WriteHandle can be replaced with a different one if a write error is 
encountered
-/// and the data instead needs to be written to a different disk.
+/// testing). The file range of a WriteHandle can be replaced with a different 
one if
+/// a write error is encountered and the data instead needs to be written to a 
different
+/// disk.
+///
+/// Free Space Management:
+/// Free space is managed within a FileGroup: once a WriteHandle is destroyed, 
the file
+/// range backing it can be recycled for a different WriteHandle. Scratch file 
ranges
+/// are grouped into size classes, each for a power-of-two number of bytes. 
Free file
+/// ranges of each size class are managed separately (i.e. there is no 
splitting or
+/// coalescing of ranges).
 ///
 /// Resource Management:
 /// TmpFileMgr provides some basic support for managing local disk space 
consumption.
 /// A FileGroup can be created with a limit on the total number of bytes 
allocated across
 /// all files. Writes that would exceed the limit fail with an error status.
 ///
-/// TODO: each FileGroup can manage only fixed length scratch file ranges of 
'block_size',
-/// to simplify the recycling logic. BufferPool will require variable length 
ranges.
 /// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to
 /// temporarily blacklist devices that show I/O errors.
 class TmpFileMgr {
@@ -94,10 +98,9 @@ class TmpFileMgr {
     /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track 
scratch
     /// space used. 'unique_id' is a unique ID that is used to prefix any 
scratch file
     /// names. It is an error to create multiple FileGroups with the same 
'unique_id'.
-    /// 'block_size' is the size of blocks in bytes that space will be 
allocated in.
     /// 'bytes_limit' is the limit on the total file space to allocate.
     FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* 
profile,
-        const TUniqueId& unique_id, int64_t block_size, int64_t bytes_limit = 
-1);
+        const TUniqueId& unique_id, int64_t bytes_limit = -1);
 
     ~FileGroup();
 
@@ -109,9 +112,6 @@ class TmpFileMgr {
     /// compression). The caller should not modify the data in 'buffer' until 
the write
     /// completes or is cancelled, otherwise invalid data may be written to 
disk.
     ///
-    /// TODO: buffer->len must be <= 'block_size' until FileGroup supports 
allocating
-    /// variable-length scratch files ranges.
-    ///
     /// Returns an error if the scratch space cannot be allocated or the write 
cannot
     /// be started. Otherwise 'handle' is set and 'cb' will be called 
asynchronously from
     /// a different thread when the write completes successfully or 
unsuccessfully or is
@@ -160,8 +160,9 @@ class TmpFileMgr {
     /// limit is exceeded. Must be called without 'lock_' held.
     Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* 
file_offset);
 
-    /// Add a free scratch range to 'free_ranges_'. Must be called without 
'lock_' held.
-    void AddFreeRange(File* file, int64_t offset);
+    /// Add the scratch range from 'handle' to 'free_ranges_' and destroy 
handle. Must be
+    /// called without 'lock_' held.
+    void RecycleFileRange(std::unique_ptr<WriteHandle> handle);
 
     /// Called when the DiskIoMgr write completes for 'handle'. On error, will 
attempt
     /// to retry the write. On success or if the write can't be retried, calls
@@ -193,10 +194,6 @@ class TmpFileMgr {
     /// Unique across all FileGroups. Used to prefix file names.
     const TUniqueId unique_id_;
 
-    /// Size of the blocks in bytes that scratch space is managed in.
-    /// TODO: support variable-length scratch file ranges.
-    const int64_t block_size_;
-
     /// Max write space allowed (-1 means no limit).
     const int64_t bytes_limit_;
 
@@ -235,8 +232,10 @@ class TmpFileMgr {
     /// files.
     int next_allocation_index_;
 
-    /// List of File/offset pairs for free scratch ranges of size 
'block_size_' bytes.
-    std::vector<std::pair<File*, int64_t>> free_ranges_;
+    /// Each vector in free_ranges_[i] is a vector of File/offset pairs for 
free scratch
+    /// ranges of length 2^i bytes. Has 64 entries so that every int64_t 
length has a
+    /// valid list associated with it.
+    std::vector<std::vector<std::pair<File*, int64_t>>> free_ranges_;
 
     /// Errors encountered when creating/writing scratch files. We store the 
history so
     /// that we can report the original cause of the scratch errors if we run 
out of

Reply via email to