IMPALA-5417: make I/O buffer queue fixed-size This removes the dynamically-varying queue size behaviour in the I/O manager. The motivation is to bound resource consumption of scans and make it possible to reserve memory for I/O buffers upfront.
Does some cleanup/documentation of the locking policy. Fix some cases in ScanRange::GetNext() where members documented as being protected by ScanRange::lock_ were accessed without holding it. I think the races were either benign or prevented by holding DiskIoRequestContext::lock_ in practice. Testing: Ran exhaustive build. Perf: Ran the full set of workloads (TPC-H, TPC-DS, targeted) on a 16 node cluster. Everything was within normal variance. Change-Id: If7cc3f7199f5320db00b7face97a96cdadb6f83f Reviewed-on: http://gerrit.cloudera.org:8080/7408 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/50d603d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/50d603d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/50d603d3 Branch: refs/heads/master Commit: 50d603d306c58ef42605eeb715356e1003dedeb4 Parents: 3e73ce2 Author: Tim Armstrong <[email protected]> Authored: Wed Jun 14 08:43:58 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Sep 15 01:59:29 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/disk-io-mgr-internal.h | 23 +- be/src/runtime/disk-io-mgr-reader-context.cc | 2 - be/src/runtime/disk-io-mgr-scan-range.cc | 70 ++-- be/src/runtime/disk-io-mgr-test.cc | 445 ++++++++++------------ be/src/runtime/disk-io-mgr.cc | 10 +- be/src/runtime/disk-io-mgr.h | 61 +-- 6 files changed, 279 insertions(+), 332 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h index 30bcd60..a9acca4 100644 --- a/be/src/runtime/disk-io-mgr-internal.h +++ b/be/src/runtime/disk-io-mgr-internal.h @@ -180,17 +180,13 @@ class DiskIoRequestContext { state.ScheduleContext(this, range->disk_id()); } - /// Cancels the context with status code 'status'. + /// Cancels the context with status code 'status' void Cancel(const Status& status); /// Adds request range to disk queue for this request context. Currently, /// schedule_immediately must be false is RequestRange is a write range. void AddRequestRange(RequestRange* range, bool schedule_immediately); - /// Returns the default queue capacity for scan ranges. This is updated - /// as the reader processes ranges. - int initial_scan_range_queue_capacity() const { return initial_queue_capacity_; } - /// Validates invariants of reader. Reader lock must be taken beforehand. bool Validate() const; @@ -265,22 +261,9 @@ class DiskIoRequestContext { /// This is the sum of all queued buffers in all ranges for this reader context. AtomicInt32 num_ready_buffers_; - /// The total (sum) of queue capacities for finished scan ranges. This value - /// divided by num_finished_ranges_ is the average for finished ranges and - /// used to seed the starting queue capacity for future ranges. The assumption - /// is that if previous ranges were fast, new ones will be fast too. The scan - /// range adjusts the queue capacity dynamically so a rough approximation will do. - AtomicInt32 total_range_queue_capacity_; - - /// The initial queue size for new scan ranges. This is always - /// total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate - /// variable to allow reading this value without taking a lock. Doing the division - /// at read time (with no lock) could lead to a race where only - /// total_range_queue_capacity_ or num_finished_ranges_ was updated. - int initial_queue_capacity_; - /// All fields below are accessed by multiple threads and the lock needs to be - /// taken before accessing them. + /// taken before accessing them. Must be acquired before ScanRange::lock_ if both + /// are held simultaneously. boost::mutex lock_; /// Current state of the reader http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-reader-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc index 6f3fe77..77f332a 100644 --- a/be/src/runtime/disk-io-mgr-reader-context.cc +++ b/be/src/runtime/disk-io-mgr-reader-context.cc @@ -157,7 +157,6 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) { num_used_buffers_.Store(0); num_buffers_in_reader_.Store(0); num_ready_buffers_.Store(0); - total_range_queue_capacity_.Store(0); num_finished_ranges_.Store(0); num_remote_ranges_.Store(0); bytes_read_local_.Store(0); @@ -166,7 +165,6 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) { unexpected_remote_bytes_.Store(0); cached_file_handles_hit_count_.Store(0); cached_file_handles_miss_count_.Store(0); - initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY; DCHECK(ready_to_start_ranges_.empty()); DCHECK(blocked_ranges_.empty()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 8ed5138..c5c9514 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -24,11 +24,6 @@ using namespace impala; -// A very large max value to prevent things from going out of control. Not -// expected to ever hit this value (1GB of buffered data per range). -const int MAX_QUEUE_CAPACITY = 128; -const int MIN_QUEUE_CAPACITY = 2; - DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() " "when performing HDFS read operations. This is necessary to use HDFS hedged reads " "(assuming the HDFS client is configured to do so)."); @@ -46,8 +41,9 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u // any time and only one thread will remove from the queue. This is to guarantee // that buffers are queued and read in file order. -// This must be called with the reader lock taken. -bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) { +bool DiskIoMgr::ScanRange::EnqueueBuffer( + const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) { + DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); { unique_lock<mutex> scan_range_lock(lock_); DCHECK(Validate()) << DebugString(); @@ -67,13 +63,8 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) { eosr_queued_ = buffer->eosr(); ready_buffers_.emplace_back(move(buffer)); - blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_; - if (blocked_on_queue_ && ready_buffers_capacity_ > MIN_QUEUE_CAPACITY) { - // We have filled the queue, indicating we need back pressure on - // the producer side (i.e. we are pushing buffers faster than they - // are pulled off, throttle this range more). - --ready_buffers_capacity_; - } + DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT); + blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT; } buffer_ready_cv_.notify_one(); @@ -83,18 +74,12 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) { Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { DCHECK(*buffer == nullptr); + bool eosr; { unique_lock<mutex> scan_range_lock(lock_); if (eosr_returned_) return Status::OK(); DCHECK(Validate()) << DebugString(); - if (ready_buffers_.empty()) { - // The queue is empty indicating this thread could use more - // IO. Increase the capacity to allow for more queueing. - ++ready_buffers_capacity_ ; - ready_buffers_capacity_ = ::min(ready_buffers_capacity_, MAX_QUEUE_CAPACITY); - } - while (ready_buffers_.empty() && !is_cancelled_) { buffer_ready_cv_.wait(scan_range_lock); } @@ -106,9 +91,11 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { // Remove the first ready buffer from the queue and return it DCHECK(!ready_buffers_.empty()); + DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT); *buffer = move(ready_buffers_.front()); ready_buffers_.pop_front(); eosr_returned_ = (*buffer)->eosr(); + eosr = (*buffer)->eosr(); } // Update tracking counters. The buffer has now moved from the IoMgr to the @@ -117,6 +104,7 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { reader_->num_buffers_in_reader_.Add(1); reader_->num_ready_buffers_.Add(-1); reader_->num_used_buffers_.Add(-1); + if (eosr) reader_->num_finished_ranges_.Add(1); Status status = (*buffer)->status_; if (!status.ok()) { @@ -125,12 +113,6 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { } unique_lock<mutex> reader_lock(reader_->lock_); - if (eosr_returned_) { - reader_->total_range_queue_capacity_.Add(ready_buffers_capacity_); - reader_->num_finished_ranges_.Add(1); - reader_->initial_queue_capacity_ = reader_->total_range_queue_capacity_.Load() / - reader_->num_finished_ranges_.Load(); - } DCHECK(reader_->Validate()) << endl << reader_->DebugString(); if (reader_->state_ == DiskIoRequestContext::Cancelled) { @@ -140,13 +122,19 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { return status_; } - bool was_blocked = blocked_on_queue_; - blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_; - if (was_blocked && !blocked_on_queue_ && !eosr_queued_) { - // This scan range was blocked and is no longer, add it to the reader - // queue again. - reader_->blocked_ranges_.Remove(this); - reader_->ScheduleScanRange(this); + { + // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer() + // may have been called after we released 'lock_' above so we need to re-check + // whether the queue is full. + unique_lock<mutex> scan_range_lock(lock_); + if (blocked_on_queue_ && ready_buffers_.size() < SCAN_RANGE_READY_BUFFER_LIMIT + && !eosr_queued_) { + blocked_on_queue_ = false; + // This scan range was blocked and is no longer, add it to the reader + // queue again. + reader_->blocked_ranges_.Remove(this); + reader_->ScheduleScanRange(this); + } } return Status::OK(); } @@ -191,7 +179,6 @@ string DiskIoMgr::ScanRange::DebugString() const { ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_ << " len=" << len_ << " bytes_read=" << bytes_read_ << " buffer_queue=" << ready_buffers_.size() - << " capacity=" << ready_buffers_capacity_ << " hdfs_file=" << exclusive_hdfs_fh_; return ss.str(); } @@ -211,11 +198,10 @@ bool DiskIoMgr::ScanRange::Validate() { return true; } -DiskIoMgr::ScanRange::ScanRange(int capacity) +DiskIoMgr::ScanRange::ScanRange() : RequestRange(RequestType::READ), num_remote_bytes_(0), external_buffer_tag_(ExternalBufferTag::NO_BUFFER), - ready_buffers_capacity_(capacity), mtime_(-1) {} DiskIoMgr::ScanRange::~ScanRange() { @@ -269,10 +255,6 @@ void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* eosr_queued_= false; eosr_returned_= false; blocked_on_queue_ = false; - if (ready_buffers_capacity_ <= 0) { - ready_buffers_capacity_ = reader->initial_scan_range_queue_capacity(); - DCHECK_GE(ready_buffers_capacity_, MIN_QUEUE_CAPACITY); - } DCHECK(Validate()) << DebugString(); } @@ -525,7 +507,9 @@ Status DiskIoMgr::ScanRange::Read( return Status::OK(); } -Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) { +Status DiskIoMgr::ScanRange::ReadFromCache( + const unique_lock<mutex>& reader_lock, bool* read_succeeded) { + DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); DCHECK(try_cache_); DCHECK_EQ(bytes_read_, 0); *read_succeeded = false; @@ -580,7 +564,7 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) { desc->scan_range_offset_ = 0; desc->eosr_ = true; bytes_read_ = bytes_read; - EnqueueBuffer(move(desc)); + EnqueueBuffer(reader_lock, move(desc)); if (reader_->bytes_read_counter_ != nullptr) { COUNTER_ADD(reader_->bytes_read_counter_, bytes_read); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index 7c60efa..a6a719f 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -48,12 +48,10 @@ namespace impala { class DiskIoMgrTest : public testing::Test { public: - virtual void SetUp() { - pool_.reset(new ObjectPool); - } + virtual void SetUp() {} virtual void TearDown() { - pool_.reset(); + pool_.Clear(); } void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range, DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data, @@ -64,7 +62,7 @@ class DiskIoMgrTest : public testing::Test { EXPECT_EQ(status.code(), expected_status.code()); } if (status.ok()) { - DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange()); + DiskIoMgr::ScanRange* scan_range = pool_.Add(new DiskIoMgr::ScanRange()); scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(), (*written_range)->offset(), 0, false, DiskIoMgr::BufferOpts::Uncached()); ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data), @@ -165,21 +163,20 @@ class DiskIoMgrTest : public testing::Test { } } - DiskIoMgr::ScanRange* AllocateRange(int num_buffers) { - return pool_->Add(new DiskIoMgr::ScanRange(num_buffers)); + DiskIoMgr::ScanRange* AllocateRange() { + return pool_.Add(new DiskIoMgr::ScanRange); } - DiskIoMgr::ScanRange* InitRange(int num_buffers, const char* file_path, int offset, - int len, int disk_id, int64_t mtime, void* meta_data = NULL, - bool is_cached = false) { - DiskIoMgr::ScanRange* range = AllocateRange(num_buffers); + DiskIoMgr::ScanRange* InitRange(const char* file_path, int offset, int len, + int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) { + DiskIoMgr::ScanRange* range = AllocateRange(); range->Reset(NULL, file_path, len, offset, disk_id, true, DiskIoMgr::BufferOpts(is_cached, mtime), meta_data); EXPECT_EQ(mtime, range->mtime()); return range; } - scoped_ptr<ObjectPool> pool_; + ObjectPool pool_; mutex written_mutex_; condition_variable writes_done_; @@ -211,19 +208,19 @@ TEST_F(DiskIoMgrTest, SingleWriter) { read_io_mgr->RegisterContext(&reader, &reader_mem_tracker); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.reset(new ObjectPool); + pool_.Clear(); // Destroy scan ranges from previous iterations. DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); DiskIoRequestContext* writer; io_mgr.RegisterContext(&writer, &mem_tracker); for (int i = 0; i < num_ranges; ++i) { - int32_t* data = pool_->Add(new int32_t); + int32_t* data = pool_.Add(new int32_t); *data = rand(); - DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*); + DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, new_range, read_io_mgr.get(), reader, data, Status::OK(), _1); - *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset, + *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range)); @@ -253,16 +250,16 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { ASSERT_OK(io_mgr.Init(&mem_tracker)); DiskIoRequestContext* writer; io_mgr.RegisterContext(&writer, NULL); - int32_t* data = pool_->Add(new int32_t); + int32_t* data = pool_.Add(new int32_t); *data = rand(); // Write to file in non-existent directory. - DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*); + DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); - *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback)); + *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range)); @@ -275,12 +272,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { EXPECT_TRUE(false); } - new_range = pool_->Add(new DiskIoMgr::WriteRange*); + new_range = pool_.Add(new DiskIoMgr::WriteRange*); callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); - *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback)); + *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range)); @@ -317,7 +314,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { read_io_mgr->RegisterContext(&reader, &reader_mem_tracker); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - pool_.reset(new ObjectPool); + pool_.Clear(); // Destroy scan ranges from previous iterations. DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); DiskIoRequestContext* writer; @@ -328,14 +325,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { io_mgr.CancelContext(writer); validate_status = Status::CANCELLED; } - int32_t* data = pool_->Add(new int32_t); + int32_t* data = pool_.Add(new int32_t); *data = rand(); - DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*); + DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data, Status::CANCELLED, _1); - *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset, + *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); cur_offset += sizeof(int32_t); @@ -372,41 +369,39 @@ TEST_F(DiskIoMgrTest, SingleReader) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) { - for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) { - pool_.reset(new ObjectPool); - LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk - << " num_disk=" << num_disks << " num_buffers=" << num_buffers - << " num_read_threads=" << num_read_threads; - - if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; - DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); + for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) { + ObjectPool pool; + LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk + << " num_disk=" << num_disks + << " num_read_threads=" << num_read_threads; - ASSERT_OK(io_mgr.Init(&mem_tracker)); - MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; + DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); - vector<DiskIoMgr::ScanRange*> ranges; - for (int i = 0; i < len; ++i) { - int disk_id = i % num_disks; - ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id, - stat_val.st_mtime)); - } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.Init(&mem_tracker)); + MemTracker reader_mem_tracker; + DiskIoRequestContext* reader; + io_mgr.RegisterContext(&reader, &reader_mem_tracker); - AtomicInt32 num_ranges_processed; - thread_group threads; - for (int i = 0; i < num_read_threads; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, - len, Status::OK(), 0, &num_ranges_processed)); - } - threads.join_all(); + vector<DiskIoMgr::ScanRange*> ranges; + for (int i = 0; i < len; ++i) { + int disk_id = i % num_disks; + ranges.push_back( + InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); + } + ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); - io_mgr.UnregisterContext(reader); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + AtomicInt32 num_ranges_processed; + thread_group threads; + for (int i = 0; i < num_read_threads; ++i) { + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + len, Status::OK(), 0, &num_ranges_processed)); } + threads.join_all(); + + EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); + io_mgr.UnregisterContext(reader); + EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } } @@ -428,56 +423,53 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) { - pool_.reset(new ObjectPool); - LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk - << " num_disk=" << num_disks << " num_buffers=" << num_buffers; - - if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; - DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); + pool_.Clear(); // Destroy scan ranges from previous iterations. + LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk + << " num_disk=" << num_disks; - ASSERT_OK(io_mgr.Init(&mem_tracker)); - MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; + DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); - vector<DiskIoMgr::ScanRange*> ranges_first_half; - vector<DiskIoMgr::ScanRange*> ranges_second_half; - for (int i = 0; i < len; ++i) { - int disk_id = i % num_disks; - if (i > len / 2) { - ranges_second_half.push_back( - InitRange(num_buffers, tmp_file, i, 1, disk_id, - stat_val.st_mtime)); - } else { - ranges_first_half.push_back(InitRange(num_buffers, tmp_file, i, 1, disk_id, - stat_val.st_mtime)); - } + ASSERT_OK(io_mgr.Init(&mem_tracker)); + MemTracker reader_mem_tracker; + DiskIoRequestContext* reader; + io_mgr.RegisterContext(&reader, &reader_mem_tracker); + + vector<DiskIoMgr::ScanRange*> ranges_first_half; + vector<DiskIoMgr::ScanRange*> ranges_second_half; + for (int i = 0; i < len; ++i) { + int disk_id = i % num_disks; + if (i > len / 2) { + ranges_second_half.push_back( + InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime)); + } else { + ranges_first_half.push_back( + InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime)); } - AtomicInt32 num_ranges_processed; - - // Issue first half the scan ranges. - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half)); + } + AtomicInt32 num_ranges_processed; - // Read a couple of them - ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2, - &num_ranges_processed); + // Issue first half the scan ranges. + ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half)); - // Issue second half - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half)); + // Read a couple of them + ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2, + &num_ranges_processed); - // Start up some threads and then cancel - thread_group threads; - for (int i = 0; i < 3; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, - strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); - } + // Issue second half + ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half)); - threads.join_all(); - EXPECT_EQ(num_ranges_processed.Load(), len); - io_mgr.UnregisterContext(reader); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); + // Start up some threads and then cancel + thread_group threads; + for (int i = 0; i < 3; ++i) { + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); } + + threads.join_all(); + EXPECT_EQ(num_ranges_processed.Load(), len); + io_mgr.UnregisterContext(reader); + EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } EXPECT_EQ(mem_tracker.consumption(), 0); @@ -500,57 +492,55 @@ TEST_F(DiskIoMgrTest, SyncReadTest) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) { - pool_.reset(new ObjectPool); - LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk - << " num_disk=" << num_disks << " num_buffers=" << num_buffers; + pool_.Clear(); // Destroy scan ranges from previous iterations. + LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk + << " num_disk=" << num_disks; - if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; - DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, - MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); + if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; + DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, + MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); - ASSERT_OK(io_mgr.Init(&mem_tracker)); - MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + ASSERT_OK(io_mgr.Init(&mem_tracker)); + MemTracker reader_mem_tracker; + DiskIoRequestContext* reader; + io_mgr.RegisterContext(&reader, &reader_mem_tracker); - DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0, - stat_val.st_mtime); + DiskIoMgr::ScanRange* complete_range = InitRange(tmp_file, 0, strlen(data), 0, + stat_val.st_mtime); - // Issue some reads before the async ones are issued - ValidateSyncRead(&io_mgr, reader, complete_range, data); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + // Issue some reads before the async ones are issued + ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader, complete_range, data); - vector<DiskIoMgr::ScanRange*> ranges; - for (int i = 0; i < len; ++i) { - int disk_id = i % num_disks; - ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id, - stat_val.st_mtime)); - } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + vector<DiskIoMgr::ScanRange*> ranges; + for (int i = 0; i < len; ++i) { + int disk_id = i % num_disks; + ranges.push_back(InitRange(tmp_file, 0, len, disk_id, + stat_val.st_mtime)); + } + ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - AtomicInt32 num_ranges_processed; - thread_group threads; - for (int i = 0; i < 5; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, - strlen(data), Status::OK(), 0, &num_ranges_processed)); - } + AtomicInt32 num_ranges_processed; + thread_group threads; + for (int i = 0; i < 5; ++i) { + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + strlen(data), Status::OK(), 0, &num_ranges_processed)); + } - // Issue some more sync ranges - for (int i = 0; i < 5; ++i) { - sched_yield(); - ValidateSyncRead(&io_mgr, reader, complete_range, data); - } + // Issue some more sync ranges + for (int i = 0; i < 5; ++i) { + sched_yield(); + ValidateSyncRead(&io_mgr, reader, complete_range, data); + } - threads.join_all(); + threads.join_all(); - ValidateSyncRead(&io_mgr, reader, complete_range, data); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader, complete_range, data); - EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); - io_mgr.UnregisterContext(reader); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); - } + EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); + io_mgr.UnregisterContext(reader); + EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } EXPECT_EQ(mem_tracker.consumption(), 0); @@ -571,51 +561,48 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { int64_t iters = 0; for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) { - pool_.reset(new ObjectPool); - LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk - << " num_disk=" << num_disks << " num_buffers=" << num_buffers; - - if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; - DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); + pool_.Clear(); // Destroy scan ranges from previous iterations. + LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk + << " num_disk=" << num_disks; - ASSERT_OK(io_mgr.Init(&mem_tracker)); - MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; + DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1); - vector<DiskIoMgr::ScanRange*> ranges; - for (int i = 0; i < len; ++i) { - int disk_id = i % num_disks; - ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id, - stat_val.st_mtime)); - } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.Init(&mem_tracker)); + MemTracker reader_mem_tracker; + DiskIoRequestContext* reader; + io_mgr.RegisterContext(&reader, &reader_mem_tracker); + + vector<DiskIoMgr::ScanRange*> ranges; + for (int i = 0; i < len; ++i) { + int disk_id = i % num_disks; + ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); + } + ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - AtomicInt32 num_ranges_processed; - int num_succesful_ranges = ranges.size() / 2; - // Read half the ranges - for (int i = 0; i < num_succesful_ranges; ++i) { - ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1, - &num_ranges_processed); - } - EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges); + AtomicInt32 num_ranges_processed; + int num_succesful_ranges = ranges.size() / 2; + // Read half the ranges + for (int i = 0; i < num_succesful_ranges; ++i) { + ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1, + &num_ranges_processed); + } + EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges); - // Start up some threads and then cancel - thread_group threads; - for (int i = 0; i < 3; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, - strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); - } + // Start up some threads and then cancel + thread_group threads; + for (int i = 0; i < 3; ++i) { + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); + } - io_mgr.CancelContext(reader); - sched_yield(); + io_mgr.CancelContext(reader); + sched_yield(); - threads.join_all(); - EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled()); - io_mgr.UnregisterContext(reader); - EXPECT_EQ(reader_mem_tracker.consumption(), 0); - } + threads.join_all(); + EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled()); + io_mgr.UnregisterContext(reader); + EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } EXPECT_EQ(mem_tracker.consumption(), 0); @@ -632,15 +619,10 @@ TEST_F(DiskIoMgrTest, MemLimits) { struct stat stat_val; stat(tmp_file, &stat_val); - const int num_buffers = 25; - // Give the reader more buffers than the limit const int mem_limit_num_buffers = 2; - - int64_t iters = 0; + // Allocate enough ranges so that the total buffers exceeds the mem limit. + const int num_ranges = 25; { - pool_.reset(new ObjectPool); - if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters; - MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE); DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); @@ -650,9 +632,8 @@ TEST_F(DiskIoMgrTest, MemLimits) { io_mgr.RegisterContext(&reader, &reader_mem_tracker); vector<DiskIoMgr::ScanRange*> ranges; - for (int i = 0; i < num_buffers; ++i) { - ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, 0, - stat_val.st_mtime)); + for (int i = 0; i < num_ranges; ++i) { + ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime)); } ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); @@ -711,12 +692,7 @@ TEST_F(DiskIoMgrTest, CachedReads) { stat(tmp_file, &stat_val); const int num_disks = 2; - const int num_buffers = 3; - - int64_t iters = 0; { - pool_.reset(new ObjectPool); - if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init(&mem_tracker)); @@ -725,7 +701,7 @@ TEST_F(DiskIoMgrTest, CachedReads) { io_mgr.RegisterContext(&reader, &reader_mem_tracker); DiskIoMgr::ScanRange* complete_range = - InitRange(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true); + InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true); // Issue some reads before the async ones are issued ValidateSyncRead(&io_mgr, reader, complete_range, data); @@ -734,8 +710,8 @@ TEST_F(DiskIoMgrTest, CachedReads) { vector<DiskIoMgr::ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id, - stat_val.st_mtime, NULL, true)); + ranges.push_back( + InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, NULL, true)); } ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); @@ -797,7 +773,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { for (int file_index = 0; file_index < num_contexts; ++file_index) { io_mgr.RegisterContext(&contexts[file_index], &mem_tracker); } - pool_.reset(new ObjectPool); + pool_.Clear(); int read_offset = 0; int write_offset = 0; while (read_offset < file_size) { @@ -808,7 +784,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { vector<DiskIoMgr::ScanRange*> ranges; int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset); for (int i = 0; i < num_scan_ranges; ++i) { - ranges.push_back(InitRange(1, file_name.c_str(), read_offset, 1, + ranges.push_back(InitRange(file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime)); threads.add_thread(new thread(ScanRangeThread, &io_mgr, contexts[context_index], @@ -823,7 +799,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback), this, num_write_ranges, _1); - DiskIoMgr::WriteRange* new_range = pool_->Add( + DiskIoMgr::WriteRange* new_range = pool_.Add( new DiskIoMgr::WriteRange(file_name, write_offset, i % num_disks, callback)); new_range->SetData(reinterpret_cast<const uint8_t*> @@ -899,44 +875,41 @@ TEST_F(DiskIoMgrTest, MultipleReader) { for (int iteration = 0; iteration < ITERATIONS; ++iteration) { for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { - for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) { - pool_.reset(new ObjectPool); - LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk - << " num_disk=" << num_disks << " num_buffers=" << num_buffers; - if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters; + pool_.Clear(); // Destroy scan ranges from previous iterations. + LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk + << " num_disk=" << num_disks; + if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters; - DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE, - MAX_BUFFER_SIZE); - EXPECT_OK(io_mgr.Init(&mem_tracker)); + DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE, + MAX_BUFFER_SIZE); + EXPECT_OK(io_mgr.Init(&mem_tracker)); - for (int i = 0; i < NUM_READERS; ++i) { - io_mgr.RegisterContext(&readers[i], &mem_tracker); + for (int i = 0; i < NUM_READERS; ++i) { + io_mgr.RegisterContext(&readers[i], &mem_tracker); - vector<DiskIoMgr::ScanRange*> ranges; - for (int j = 0; j < DATA_LEN; ++j) { - int disk_id = j % num_disks; - ranges.push_back( - InitRange(num_buffers,file_names[i].c_str(), j, 1, disk_id, - mtimes[i])); - } - ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges)); + vector<DiskIoMgr::ScanRange*> ranges; + for (int j = 0; j < DATA_LEN; ++j) { + int disk_id = j % num_disks; + ranges.push_back( + InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i])); } + ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges)); + } - AtomicInt32 num_ranges_processed; - thread_group threads; - for (int i = 0; i < NUM_READERS; ++i) { - for (int j = 0; j < NUM_THREADS_PER_READER; ++j) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i], - data[i].c_str(), data[i].size(), Status::OK(), 0, - &num_ranges_processed)); - } - } - threads.join_all(); - EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS); - for (int i = 0; i < NUM_READERS; ++i) { - io_mgr.UnregisterContext(readers[i]); + AtomicInt32 num_ranges_processed; + thread_group threads; + for (int i = 0; i < NUM_READERS; ++i) { + for (int j = 0; j < NUM_THREADS_PER_READER; ++j) { + threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i], + data[i].c_str(), data[i].size(), Status::OK(), 0, + &num_ranges_processed)); } } + threads.join_all(); + EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS); + for (int i = 0; i < NUM_READERS; ++i) { + io_mgr.UnregisterContext(readers[i]); + } } } } @@ -966,7 +939,7 @@ TEST_F(DiskIoMgrTest, Buffers) { DiskIoRequestContext* reader; io_mgr.RegisterContext(&reader, &reader_mem_tracker); - DiskIoMgr::ScanRange* dummy_range = InitRange(1, "dummy", 0, 0, 0, 0); + DiskIoMgr::ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0); // buffer length should be rounded up to min buffer size int64_t buffer_len = 1; @@ -1043,7 +1016,7 @@ TEST_F(DiskIoMgrTest, PartialRead) { io_mgr->RegisterContext(&reader, &reader_mem_tracker); // We should not read past the end of file. - DiskIoMgr::ScanRange* range = InitRange(1, tmp_file, 0, read_len, 0, stat_val.st_mtime); + DiskIoMgr::ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime); unique_ptr<DiskIoMgr::BufferDescriptor> buffer; ASSERT_OK(io_mgr->Read(reader, range, &buffer)); ASSERT_TRUE(buffer->eosr()); @@ -1052,7 +1025,7 @@ TEST_F(DiskIoMgrTest, PartialRead) { io_mgr->ReturnBuffer(move(buffer)); io_mgr->UnregisterContext(reader); - pool_.reset(); + pool_.Clear(); io_mgr.reset(); EXPECT_EQ(reader_mem_tracker.consumption(), 0); EXPECT_EQ(mem_tracker.consumption(), 0); @@ -1078,7 +1051,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { for (int buffer_len : vector<int>({len - 1, len, len + 1})) { vector<uint8_t> client_buffer(buffer_len); int scan_len = min(len, buffer_len); - DiskIoMgr::ScanRange* range = AllocateRange(1); + DiskIoMgr::ScanRange* range = AllocateRange(); range->Reset(NULL, tmp_file, scan_len, 0, 0, true, DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), buffer_len)); ASSERT_OK(io_mgr->AddScanRange(reader, range, true)); @@ -1096,7 +1069,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { } io_mgr->UnregisterContext(reader); - pool_.reset(); + pool_.Clear(); io_mgr.reset(); EXPECT_EQ(mem_tracker.consumption(), 0); } @@ -1116,7 +1089,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { vector<uint8_t> client_buffer(SCAN_LEN); for (int i = 0; i < 1000; ++i) { io_mgr->RegisterContext(&reader, reader_mem_tracker); - DiskIoMgr::ScanRange* range = AllocateRange(1); + DiskIoMgr::ScanRange* range = AllocateRange(); range->Reset(NULL, tmp_file, SCAN_LEN, 0, 0, true, DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN)); ASSERT_OK(io_mgr->AddScanRange(reader, range, true)); @@ -1134,7 +1107,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { io_mgr->UnregisterContext(reader); } - pool_.reset(); + pool_.Clear(); io_mgr.reset(); EXPECT_EQ(mem_tracker.consumption(), 0); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index e77d9ca..7cc2af7 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -118,7 +118,7 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds, // current queue size. static const int LOW_MEMORY = 64 * 1024 * 1024; -const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2; +const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; AtomicInt32 DiskIoMgr::next_disk_id_; @@ -583,7 +583,7 @@ Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader, if (range->try_cache_) { if (schedule_immediately) { bool cached_read_succeeded; - RETURN_IF_ERROR(range->ReadFromCache(&cached_read_succeeded)); + RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded)); if (cached_read_succeeded) continue; // Cached read failed, fall back to AddRequestRange() below. } else { @@ -633,7 +633,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) *range = reader->cached_ranges_.Dequeue(); DCHECK((*range)->try_cache_); bool cached_read_succeeded; - RETURN_IF_ERROR((*range)->ReadFromCache(&cached_read_succeeded)); + RETURN_IF_ERROR((*range)->ReadFromCache(reader_lock, &cached_read_succeeded)); if (cached_read_succeeded) return Status::OK(); // This range ended up not being cached. Loop again and pick up a new range. @@ -994,7 +994,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* ScanRange* scan_range = buffer->scan_range_; scan_range->Cancel(reader->status_); // Enqueue the buffer to use the scan range's buffer cleanup path. - scan_range->EnqueueBuffer(move(buffer)); + scan_range->EnqueueBuffer(reader_lock, move(buffer)); return; } @@ -1021,7 +1021,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* bool eosr = buffer->eosr_; ScanRange* scan_range = buffer->scan_range_; bool is_cached = buffer->is_cached(); - bool queue_full = scan_range->EnqueueBuffer(move(buffer)); + bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer)); if (eosr) { // For cached buffers, we can't close the range until the cached buffer is returned. // Close() is called from DiskIoMgr::ReturnBuffer(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index 138f973..ed33942 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -129,16 +129,12 @@ class MemTracker; /// To have multiple reading threads, the caller would simply spin up the threads /// and each would process the loops above. // -/// To control the number of IO buffers, each scan range has a soft max capacity for -/// the number of queued buffers. If the number of buffers is at capacity, the IoMgr -/// will no longer read for that scan range until the caller has processed a buffer. -/// This capacity does not need to be fixed, and the caller can dynamically adjust -/// it if necessary. -// -/// As an example: If we allowed 5 buffers per range on a 24 core, 72 thread -/// (we default to allowing 3x threads) machine, we should see at most -/// 72 * 5 * 8MB = 2.8GB in io buffers memory usage. This should remain roughly constant -/// regardless of how many concurrent readers are running. +/// To control the number of IO buffers, each scan range has a limit of two queued +/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity, +/// the IoMgr will no longer read for that scan range until the caller has processed +/// a buffer. Assuming the client returns each buffer before requesting the next one +/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per +/// scan range. // /// Buffer Management: /// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller, @@ -382,8 +378,7 @@ class DiskIoMgr : public CacheLineAligned { /// the IoMgr. class ScanRange : public RequestRange { public: - /// The initial queue capacity for this. Specify -1 to use IoMgr default. - ScanRange(int initial_capacity = -1); + ScanRange(); virtual ~ScanRange(); @@ -401,7 +396,6 @@ class DiskIoMgr : public CacheLineAligned { void* meta_data() const { return meta_data_; } bool try_cache() const { return try_cache_; } bool expected_local() const { return expected_local_; } - int ready_buffers_capacity() const { return ready_buffers_capacity_; } /// Returns the next buffer for this scan range. buffer is an output parameter. /// This function blocks until a buffer is ready or an error occurred. If this is @@ -431,8 +425,10 @@ class DiskIoMgr : public CacheLineAligned { /// Enqueues a buffer for this range. This does not block. /// Returns true if this scan range has hit the queue capacity, false otherwise. /// The caller passes ownership of buffer to the scan range and it is not - /// valid to access buffer after this call. - bool EnqueueBuffer(std::unique_ptr<BufferDescriptor> buffer); + /// valid to access buffer after this call. The reader lock must be held by the + /// caller. + bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock, + std::unique_ptr<BufferDescriptor> buffer); /// Cleanup any queued buffers (i.e. due to cancellation). This cannot /// be called with any locks taken. @@ -475,7 +471,9 @@ class DiskIoMgr : public CacheLineAligned { /// and *read_succeeded to true. /// If the data is not cached, returns ok() and *read_succeeded is set to false. /// Returns a non-ok status if it ran into a non-continuable error. - Status ReadFromCache(bool* read_succeeded) WARN_UNUSED_RESULT; + /// The reader lock must be held by the caller. + Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock, + bool* read_succeeded) WARN_UNUSED_RESULT; /// Pointer to caller specified metadata. This is untouched by the io manager /// and the caller can put whatever auxiliary data in here. @@ -540,7 +538,9 @@ class DiskIoMgr : public CacheLineAligned { }; /// Lock protecting fields below. - /// This lock should not be taken during Open/Read/Close. + /// This lock should not be taken during Open()/Read()/Close(). + /// If DiskIoRequestContext::lock_ and this lock need to be held simultaneously, + /// DiskIoRequestContext::lock_ must be taken first. boost::mutex lock_; /// Number of bytes read so far for this scan range @@ -566,12 +566,6 @@ class DiskIoMgr : public CacheLineAligned { boost::condition_variable buffer_ready_cv_; std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; - /// The soft capacity limit for ready_buffers_. ready_buffers_ can exceed - /// the limit temporarily as the capacity is adjusted dynamically. - /// In that case, the capcity is only realized when the caller removes buffers - /// from ready_buffers_. - int ready_buffers_capacity_; - /// Lock that should be taken during hdfs calls. Only one thread (the disk reading /// thread) calls into hdfs at a time so this lock does not have performance impact. /// This lock only serves to coordinate cleanup. Specifically it serves to ensure @@ -799,9 +793,24 @@ class DiskIoMgr : public CacheLineAligned { /// 'bytes_to_free' is -1. void GcIoBuffers(int64_t bytes_to_free = -1); - /// Default ready buffer queue capacity. This constant doesn't matter too much - /// since the system dynamically adjusts. - static const int DEFAULT_QUEUE_CAPACITY; + /// The maximum number of ready buffers that can be queued in a scan range. Having two + /// queued buffers (plus the buffer that is returned to the client) gives good + /// performance in most scenarios: + /// 1. If the consumer is consuming data faster than we can read from disk, then the + /// queue will be empty most of the time because the buffer will be immediately + /// pulled off the queue as soon as it is added. There will always be an I/O request + /// in the disk queue to maximize I/O throughput, which is the bottleneck in this + /// case. + /// 2. If we can read from disk faster than the consumer is consuming data, the queue + /// will fill up and there will always be a buffer available for the consumer to + /// read, so the consumer will not block and we maximize consumer throughput, which + /// is the bottleneck in this case. + /// 3. If the consumer is consuming data at approximately the same rate as we are + /// reading from disk, then the steady state is that the consumer is processing one + /// buffer and one buffer is in the disk queue. The additional buffer can absorb + /// bursts where the producer runs faster than the consumer or the consumer runs + /// faster than the producer without blocking either the producer or consumer. + static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2; /// "Disk" queue offsets for remote accesses. Offset 0 corresponds to /// disk ID (i.e. disk_queue_ index) of num_local_disks().
