http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 0ede5b5..8933fec 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "common/global-flags.h" #include "runtime/io/disk-io-mgr.h" + +#include "common/global-flags.h" +#include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr-internal.h" #include "runtime/io/handle-cache.inline.h" #include "runtime/io/error-converter.h" @@ -53,6 +55,8 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk"); static const int THREADS_PER_ROTATIONAL_DISK = 1; static const int THREADS_PER_SOLID_STATE_DISK = 8; +const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; + // The maximum number of the threads per rotational disk is also the max queue depth per // rotational disk. static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of " @@ -123,13 +127,6 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds, DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used by the " "file handle cache."); -// The IoMgr is able to run with a wide range of memory usage. If a query has memory -// remaining less than this value, the IoMgr will stop all buffering regardless of the -// current queue size. -static const int LOW_MEMORY = 64 * 1024 * 1024; - -const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; - AtomicInt32 DiskIoMgr::next_disk_id_; namespace detail { @@ -156,34 +153,6 @@ string DiskIoMgr::DebugString() { return ss.str(); } -BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, - RequestContext* reader, ScanRange* scan_range, uint8_t* buffer, - int64_t buffer_len, MemTracker* mem_tracker) - : io_mgr_(io_mgr), - reader_(reader), - mem_tracker_(mem_tracker), - scan_range_(scan_range), - buffer_(buffer), - buffer_len_(buffer_len) { - DCHECK(io_mgr != nullptr); - DCHECK(scan_range != nullptr); - DCHECK(buffer != nullptr); - DCHECK_GE(buffer_len, 0); - DCHECK_NE(scan_range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER, - mem_tracker == nullptr); -} - -void BufferDescriptor::TransferOwnership(MemTracker* dst) { - DCHECK(dst != nullptr); - DCHECK(!is_client_buffer()); - // Memory of cached buffers is not tracked against a tracker. - if (is_cached()) return; - DCHECK(mem_tracker_ != nullptr); - dst->Consume(buffer_len_); - mem_tracker_->Release(buffer_len_); - mem_tracker_ = dst; -} - WriteRange::WriteRange( const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback) : RequestRange(RequestType::WRITE), callback_(callback) { @@ -224,8 +193,8 @@ DiskIoMgr::DiskIoMgr() : num_io_threads_per_solid_state_disk_(GetFirstPositiveVal( FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk, THREADS_PER_SOLID_STATE_DISK)), - max_buffer_size_(FLAGS_read_size), - min_buffer_size_(FLAGS_min_buffer_size), + max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)), + min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)), shut_down_(false), total_bytes_read_counter_(TUnit::BYTES), read_timer_(TUnit::TIME_NS), @@ -234,8 +203,6 @@ DiskIoMgr::DiskIoMgr() : FLAGS_num_file_handle_cache_partitions, FLAGS_unused_file_handle_timeout_sec) { DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size); - int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_); - free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1); int num_local_disks = DiskInfo::num_disks(); if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) { LOG(WARNING) << "Number of disks specified should be between 0 and the number of " @@ -250,11 +217,11 @@ DiskIoMgr::DiskIoMgr() : } DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk, - int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) : + int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size) : num_io_threads_per_rotational_disk_(threads_per_rotational_disk), num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk), - max_buffer_size_(max_buffer_size), - min_buffer_size_(min_buffer_size), + max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)), + min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)), shut_down_(false), total_bytes_read_counter_(TUnit::BYTES), read_timer_(TUnit::TIME_NS), @@ -262,8 +229,6 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk, FileSystemUtil::MaxNumFileHandles()), FLAGS_num_file_handle_cache_partitions, FLAGS_unused_file_handle_timeout_sec) { - int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_); - free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1); if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks(); disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS); CheckSseSupport(); @@ -288,37 +253,22 @@ DiskIoMgr::~DiskIoMgr() { for (int i = 0; i < disk_queues_.size(); ++i) { if (disk_queues_[i] == nullptr) continue; int disk_id = disk_queues_[i]->disk_id; - for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin(); - it != disk_queues_[i]->request_contexts.end(); ++it) { - DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0); - DCHECK((*it)->disk_states_[disk_id].done()); - (*it)->DecrementDiskRefCount(); + for (RequestContext* context : disk_queues_[i]->request_contexts) { + unique_lock<mutex> context_lock(context->lock_); + DCHECK_EQ(context->disk_states_[disk_id].num_threads_in_op(), 0); + DCHECK(context->disk_states_[disk_id].done()); + context->DecrementDiskRefCount(context_lock); } } - DCHECK_EQ(num_buffers_in_readers_.Load(), 0); - - // Delete all allocated buffers - int num_free_buffers = 0; - for (int idx = 0; idx < free_buffers_.size(); ++idx) { - num_free_buffers += free_buffers_[idx].size(); - } - DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers); - GcIoBuffers(); - for (int i = 0; i < disk_queues_.size(); ++i) { delete disk_queues_[i]; } - if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close(); if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_); } -Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { - DCHECK(process_mem_tracker != nullptr); - free_buffer_mem_tracker_.reset( - new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false)); - +Status DiskIoMgr::Init() { for (int i = 0; i < disk_queues_.size(); ++i) { disk_queues_[i] = new DiskQueue(i); int num_threads_per_disk; @@ -364,101 +314,14 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { return Status::OK(); } -unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) { - return unique_ptr<RequestContext>( - new RequestContext(this, num_total_disks(), mem_tracker)); +unique_ptr<RequestContext> DiskIoMgr::RegisterContext() { + return unique_ptr<RequestContext>(new RequestContext(this, num_total_disks())); } void DiskIoMgr::UnregisterContext(RequestContext* reader) { reader->CancelAndMarkInactive(); } -// Cancellation requires coordination from multiple threads. Each thread that currently -// has a reference to the request context must notice the cancel and remove it from its -// tracking structures. The last thread to touch the context should deallocate (aka -// recycle) the request context object. Potential threads are: -// 1. Disk threads that are currently reading for this reader. -// 2. Caller threads that are waiting in GetNext. -// -// The steps are: -// 1. Cancel will immediately set the context in the Cancelled state. This prevents any -// other thread from adding more ready buffers to the context (they all take a lock and -// check the state before doing so), or any write ranges to the context. -// 2. Cancel will call cancel on each ScanRange that is not yet complete, unblocking -// any threads in GetNext(). The reader will see the cancelled Status returned. Cancel -// also invokes the callback for the WriteRanges with the cancelled state. -// 3. Disk threads notice the context is cancelled either when picking the next context -// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled -// state, removes the context from the disk queue. The last thread per disk with an -// outstanding reference to the context decrements the number of disk queues the context -// is on. -void DiskIoMgr::CancelContext(RequestContext* context) { - context->Cancel(Status::CANCELLED); -} - -void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) { - r->read_timer_ = c; -} - -void DiskIoMgr::set_open_file_timer(RequestContext* r, RuntimeProfile::Counter* c) { - r->open_file_timer_ = c; -} - -void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) { - r->bytes_read_counter_ = c; -} - -void DiskIoMgr::set_active_read_thread_counter(RequestContext* r, - RuntimeProfile::Counter* c) { - r->active_read_thread_counter_ = c; -} - -void DiskIoMgr::set_disks_access_bitmap(RequestContext* r, - RuntimeProfile::Counter* c) { - r->disks_accessed_bitmap_ = c; -} - -int64_t DiskIoMgr::queue_size(RequestContext* reader) const { - return reader->num_ready_buffers_.Load(); -} - -Status DiskIoMgr::context_status(RequestContext* context) const { - unique_lock<mutex> lock(context->lock_); - return context->status_; -} - -int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const { - return reader->bytes_read_local_.Load(); -} - -int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const { - return reader->bytes_read_short_circuit_.Load(); -} - -int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const { - return reader->bytes_read_dn_cache_.Load(); -} - -int DiskIoMgr::num_remote_ranges(RequestContext* reader) const { - return reader->num_remote_ranges_.Load(); -} - -int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const { - return reader->unexpected_remote_bytes_.Load(); -} - -int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const { - return reader->cached_file_handles_hit_count_.Load(); -} - -int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const { - return reader->cached_file_handles_miss_count_.Load(); -} - -int64_t DiskIoMgr::GetReadThroughput() { - return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, &read_timer_); -} - Status DiskIoMgr::ValidateScanRange(ScanRange* range) { int disk_id = range->disk_id_; if (disk_id < 0 || disk_id >= disk_queues_.size()) { @@ -469,84 +332,91 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) { return Status(TErrorCode::DISK_IO_ERROR, Substitute("Invalid scan range. Negative offset $0", range->offset_)); } - if (range->len_ < 0) { + if (range->len_ <= 0) { return Status(TErrorCode::DISK_IO_ERROR, - Substitute("Invalid scan range. Negative length $0", range->len_)); + Substitute("Invalid scan range. Non-positive length $0", range->len_)); } return Status::OK(); } -Status DiskIoMgr::AddScanRanges(RequestContext* reader, - const vector<ScanRange*>& ranges, bool schedule_immediately) { - if (ranges.empty()) return Status::OK(); - +Status DiskIoMgr::AddScanRanges( + RequestContext* reader, const vector<ScanRange*>& ranges) { + DCHECK_GT(ranges.size(), 0); // Validate and initialize all ranges for (int i = 0; i < ranges.size(); ++i) { RETURN_IF_ERROR(ValidateScanRange(ranges[i])); ranges[i]->InitInternal(this, reader); } - // disks that this reader needs to be scheduled on. unique_lock<mutex> reader_lock(reader->lock_); DCHECK(reader->Validate()) << endl << reader->DebugString(); - if (reader->state_ == RequestContext::Cancelled) { - DCHECK(!reader->status_.ok()); - return reader->status_; - } + if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED; // Add each range to the queue of the disk the range is on - for (int i = 0; i < ranges.size(); ++i) { + for (ScanRange* range : ranges) { // Don't add empty ranges. - DCHECK_NE(ranges[i]->len(), 0); - ScanRange* range = ranges[i]; - + DCHECK_NE(range->len(), 0); + reader->AddActiveScanRangeLocked(reader_lock, range); if (range->try_cache_) { - if (schedule_immediately) { - bool cached_read_succeeded; - RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded)); - if (cached_read_succeeded) continue; - // Cached read failed, fall back to AddRequestRange() below. - } else { - reader->cached_ranges_.Enqueue(range); - continue; - } + reader->cached_ranges_.Enqueue(range); + } else { + reader->AddRangeToDisk(reader_lock, range, ScheduleMode::UPON_GETNEXT); } - reader->AddRequestRange(range, schedule_immediately); } DCHECK(reader->Validate()) << endl << reader->DebugString(); - return Status::OK(); } -Status DiskIoMgr::AddScanRange( - RequestContext* reader, ScanRange* range, bool schedule_immediately) { - return AddScanRanges(reader, vector<ScanRange*>({range}), schedule_immediately); +Status DiskIoMgr::StartScanRange(RequestContext* reader, ScanRange* range, + bool* needs_buffers) { + RETURN_IF_ERROR(ValidateScanRange(range)); + range->InitInternal(this, reader); + + unique_lock<mutex> reader_lock(reader->lock_); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED; + + DCHECK_NE(range->len(), 0); + if (range->try_cache_) { + bool cached_read_succeeded; + RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded)); + if (cached_read_succeeded) { + DCHECK(reader->Validate()) << endl << reader->DebugString(); + *needs_buffers = false; + return Status::OK(); + } + // Cached read failed, fall back to normal read path. + } + // If we don't have a buffer yet, the caller must allocate buffers for the range. + *needs_buffers = range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER; + if (*needs_buffers) range->SetBlockedOnBuffer(); + reader->AddActiveScanRangeLocked(reader_lock, range); + reader->AddRangeToDisk(reader_lock, range, + *needs_buffers ? ScheduleMode::BY_CALLER : ScheduleMode::IMMEDIATELY); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + return Status::OK(); } // This function returns the next scan range the reader should work on, checking // for eos and error cases. If there isn't already a cached scan range or a scan // range prepared by the disk threads, the caller waits on the disk threads. -Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { +Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** range, + bool* needs_buffers) { DCHECK(reader != nullptr); DCHECK(range != nullptr); *range = nullptr; - Status status = Status::OK(); + *needs_buffers = false; unique_lock<mutex> reader_lock(reader->lock_); DCHECK(reader->Validate()) << endl << reader->DebugString(); - while (true) { - if (reader->state_ == RequestContext::Cancelled) { - DCHECK(!reader->status_.ok()); - status = reader->status_; - break; - } + if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED; if (reader->num_unstarted_scan_ranges_.Load() == 0 && reader->ready_to_start_ranges_.empty() && reader->cached_ranges_.empty()) { // All ranges are done, just return. - break; + return Status::OK(); } if (!reader->cached_ranges_.empty()) { @@ -558,7 +428,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { if (cached_read_succeeded) return Status::OK(); // This range ended up not being cached. Loop again and pick up a new range. - reader->AddRequestRange(*range, false); + reader->AddRangeToDisk(reader_lock, *range, ScheduleMode::UPON_GETNEXT); DCHECK(reader->Validate()) << endl << reader->DebugString(); *range = nullptr; continue; @@ -574,183 +444,72 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { // Set this to nullptr, the next time this disk runs for this reader, it will // get another range ready. reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr); - reader->ScheduleScanRange(*range); - break; + ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag_; + if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) { + // We can't schedule this range until the client gives us buffers. The context + // must be rescheduled regardless to ensure that 'next_scan_range_to_start' is + // refilled. + reader->disk_states_[disk_id].ScheduleContext(reader_lock, reader, disk_id); + (*range)->SetBlockedOnBuffer(); + *needs_buffers = true; + } else { + reader->ScheduleScanRange(reader_lock, *range); + } + return Status::OK(); } } - return status; } -Status DiskIoMgr::Read(RequestContext* reader, - ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) { - DCHECK(range != nullptr); - DCHECK(buffer != nullptr); - *buffer = nullptr; - - if (range->len() > max_buffer_size_ - && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) { - return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: cannot " - "perform sync read of '$0' bytes that is larger than the max read buffer size " - "'$1'.", range->len(), max_buffer_size_)); - } - - vector<ScanRange*> ranges; - ranges.push_back(range); - RETURN_IF_ERROR(AddScanRanges(reader, ranges, true)); - RETURN_IF_ERROR(range->GetNext(buffer)); - DCHECK((*buffer) != nullptr); - DCHECK((*buffer)->eosr()); +Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader, + BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) { + DCHECK_GE(max_bytes, min_buffer_size_); + DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) + << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate buffers " + << "when already reading into an external buffer"; + BufferPool* bp = ExecEnv::GetInstance()->buffer_pool(); + Status status; + vector<unique_ptr<BufferDescriptor>> buffers; + for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) { + BufferPool::BufferHandle handle; + status = bp->AllocateBuffer(bp_client, buffer_size, &handle); + if (!status.ok()) goto error; + buffers.emplace_back(new BufferDescriptor( + this, reader, range, bp_client, move(handle))); + } + range->AddUnusedBuffers(move(buffers), false); return Status::OK(); + error: + DCHECK(!status.ok()); + range->CleanUpBuffers(move(buffers)); + return status; } -void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { - DCHECK(buffer_desc != nullptr); - if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr); - - RequestContext* reader = buffer_desc->reader_; - if (buffer_desc->buffer_ != nullptr) { - if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) { - // Buffers the were not allocated by DiskIoMgr don't need to be freed. - FreeBufferMemory(buffer_desc.get()); - } - buffer_desc->buffer_ = nullptr; - num_buffers_in_readers_.Add(-1); - reader->num_buffers_in_reader_.Add(-1); - } else { - // A nullptr buffer means there was an error in which case there is no buffer - // to return. - } - - if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) { - // Need to close the scan range if returning the last buffer or the scan range - // has been cancelled (and the caller might never get the last buffer). - // Close() is idempotent so multiple cancelled buffers is okay. - buffer_desc->scan_range_->Close(); - } -} - -unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer( - RequestContext* reader, ScanRange* range, int64_t buffer_size) { - DCHECK_LE(buffer_size, max_buffer_size_); - DCHECK_GT(buffer_size, 0); - buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size); - int idx = free_buffers_idx(buffer_size); - // Quantize buffer size to nearest power of 2 greater than the specified buffer size and - // convert to bytes - buffer_size = (1LL << idx) * min_buffer_size_; - - // Track memory against the reader. This is checked the next time we start - // a read for the next reader in DiskIoMgr::GetNextScanRange(). - DCHECK(reader->mem_tracker_ != nullptr); - reader->mem_tracker_->Consume(buffer_size); - - uint8_t* buffer = nullptr; - { - unique_lock<mutex> lock(free_buffers_lock_); - if (free_buffers_[idx].empty()) { - num_allocated_buffers_.Add(1); - if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) { - ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L); - } - if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) { - ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size); - } - // We already tracked this memory against the reader's MemTracker. - buffer = new uint8_t[buffer_size]; +vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes) { + DCHECK_GE(max_bytes, min_buffer_size_); + vector<int64_t> buffer_sizes; + int64_t bytes_allocated = 0; + while (bytes_allocated < scan_range_len) { + int64_t bytes_remaining = scan_range_len - bytes_allocated; + // Either allocate a max-sized buffer or a smaller buffer to fit the rest of the + // range. + int64_t next_buffer_size; + if (bytes_remaining >= max_buffer_size_) { + next_buffer_size = max_buffer_size_; } else { - if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) { - ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L); - } - buffer = free_buffers_[idx].front(); - free_buffers_[idx].pop_front(); - free_buffer_mem_tracker_->Release(buffer_size); - ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size); + next_buffer_size = + max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining)); } - } - - // Validate more invariants. - DCHECK(range != nullptr); - DCHECK(reader != nullptr); - DCHECK(buffer != nullptr); - return unique_ptr<BufferDescriptor>(new BufferDescriptor( - this, reader, range, buffer, buffer_size, reader->mem_tracker_)); -} - -void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) { - unique_lock<mutex> lock(free_buffers_lock_); - int buffers_freed = 0; - int bytes_freed = 0; - // Free small-to-large to avoid retaining many small buffers and fragmenting memory. - for (int idx = 0; idx < free_buffers_.size(); ++idx) { - deque<uint8_t*>* free_buffers = &free_buffers_[idx]; - while ( - !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) { - uint8_t* buffer = free_buffers->front(); - free_buffers->pop_front(); - int64_t buffer_size = (1LL << idx) * min_buffer_size_; - ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size); - delete[] buffer; - free_buffer_mem_tracker_->Release(buffer_size); - num_allocated_buffers_.Add(-1); - - ++buffers_freed; - bytes_freed += buffer_size; - } - if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break; - } - - if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) { - ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed); - } - if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) { - ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed); - } - if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) { - ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed); - } -} - -void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) { - DCHECK(!desc->is_cached()); - DCHECK(!desc->is_client_buffer()); - uint8_t* buffer = desc->buffer_; - int64_t buffer_size = desc->buffer_len_; - int idx = free_buffers_idx(buffer_size); - DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0) - << "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = " - << buffer_size << ", min_buffer_size_ = " << min_buffer_size_; - - { - unique_lock<mutex> lock(free_buffers_lock_); - if (!FLAGS_disable_mem_pools && - free_buffers_[idx].size() < FLAGS_max_free_io_buffers) { - // Poison buffers stored in cache. - ASAN_POISON_MEMORY_REGION(buffer, buffer_size); - free_buffers_[idx].push_back(buffer); - if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) { - ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L); - } - // This consume call needs to be protected by 'free_buffers_lock_' to avoid a race - // with a Release() call for the same buffer that could make consumption negative. - // Note: we can't use TryConsume(), which can indirectly call GcIoBuffers(). - // TODO: after IMPALA-3200 is completed, we should be able to leverage the buffer - // pool's free lists, and remove these free lists. - free_buffer_mem_tracker_->Consume(buffer_size); - } else { - num_allocated_buffers_.Add(-1); - delete[] buffer; - if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) { - ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L); - } - if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) { - ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size); - } + if (next_buffer_size + bytes_allocated > max_bytes) { + // Can't allocate the desired buffer size. Make sure to allocate at least one + // buffer. + if (bytes_allocated > 0) break; + next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes); } + DCHECK(BitUtil::IsPowerOf2(next_buffer_size)) << next_buffer_size; + buffer_sizes.push_back(next_buffer_size); + bytes_allocated += next_buffer_size; } - - // We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr. - desc->mem_tracker_->Release(buffer_size); - desc->buffer_ = nullptr; + return buffer_sizes; } // This function gets the next RequestRange to work on for this disk. It checks for @@ -790,7 +549,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, disk_queue->request_contexts.pop_front(); DCHECK(*request_context != nullptr); request_disk_state = &((*request_context)->disk_states_[disk_id]); - request_disk_state->IncrementRequestThreadAndDequeue(); + request_disk_state->IncrementDiskThreadAndDequeue(); } // NOTE: no locks were taken in between. We need to be careful about what state @@ -798,27 +557,13 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, // There are some invariants here. Only one disk thread can have the // same reader here (the reader is removed from the queue). There can be // other disk threads operating on this reader in other functions though. - - // We just picked a reader. Before we may allocate a buffer on its behalf, check that - // it has not exceeded any memory limits (e.g. the query or process limit). - // TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and - // move these memory limit checks to GetFreeBuffer(). - // Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers(). - // TODO: IMPALA-3209: we should not force a reader over its memory limit by - // pushing more buffers to it. Most readers can make progress and operate within - // a fixed memory limit. - if ((*request_context)->mem_tracker_ != nullptr - && (*request_context)->mem_tracker_->AnyLimitExceeded()) { - (*request_context)->Cancel(Status::MemLimitExceeded()); - } - unique_lock<mutex> request_lock((*request_context)->lock_); VLOG_FILE << "Disk (id=" << disk_id << ") reading for " << (*request_context)->DebugString(); // Check if reader has been cancelled if ((*request_context)->state_ == RequestContext::Cancelled) { - request_disk_state->DecrementRequestThreadAndCheckDone(*request_context); + request_disk_state->DecrementDiskThread(request_lock, *request_context); continue; } @@ -829,16 +574,16 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, !request_disk_state->unstarted_scan_ranges()->empty()) { // We don't have a range queued for this disk for what the caller should // read next. Populate that. We want to have one range waiting to minimize - // wait time in GetNextRange. + // wait time in GetNextUnstartedRange(). ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue(); (*request_context)->num_unstarted_scan_ranges_.Add(-1); (*request_context)->ready_to_start_ranges_.Enqueue(new_range); request_disk_state->set_next_scan_range_to_start(new_range); if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) { - // All the ranges have been started, notify everyone blocked on GetNextRange. - // Only one of them will get work so make sure to return nullptr to the other - // caller threads. + // All the ranges have been started, notify everyone blocked on + // GetNextUnstartedRange(). Only one of them will get work so make sure to return + // nullptr to the other caller threads. (*request_context)->ready_to_start_ranges_cv_.NotifyAll(); } else { (*request_context)->ready_to_start_ranges_cv_.NotifyOne(); @@ -861,7 +606,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, // There are no inflight ranges, nothing to do. if (request_disk_state->in_flight_ranges()->empty()) { - request_disk_state->DecrementRequestThread(); + request_disk_state->DecrementDiskThread(request_lock, *request_context); continue; } DCHECK_GT(request_disk_state->num_remaining_ranges(), 0); @@ -870,7 +615,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, // Now that we've picked a request range, put the context back on the queue so // another thread can pick up another request range for this context. - request_disk_state->ScheduleContext(*request_context, disk_id); + request_disk_state->ScheduleContext(request_lock, *request_context, disk_id); DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString(); return true; } @@ -884,81 +629,62 @@ void DiskIoMgr::HandleWriteFinished( // Copy disk_id before running callback: the callback may modify write_range. int disk_id = write_range->disk_id_; - // Execute the callback before decrementing the thread count. Otherwise CancelContext() - // that waits for the disk ref count to be 0 will return, creating a race, e.g. see - // IMPALA-1890. + // Execute the callback before decrementing the thread count. Otherwise + // RequestContext::Cancel() that waits for the disk ref count to be 0 will + // return, creating a race, e.g. see IMPALA-1890. // The status of the write does not affect the status of the writer context. write_range->callback_(write_status); { unique_lock<mutex> writer_lock(writer->lock_); DCHECK(writer->Validate()) << endl << writer->DebugString(); RequestContext::PerDiskState& state = writer->disk_states_[disk_id]; - if (writer->state_ == RequestContext::Cancelled) { - state.DecrementRequestThreadAndCheckDone(writer); - } else { - state.DecrementRequestThread(); - } + state.DecrementDiskThread(writer_lock, writer); --state.num_remaining_ranges(); } } void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, - unique_ptr<BufferDescriptor> buffer) { + Status read_status, unique_ptr<BufferDescriptor> buffer) { unique_lock<mutex> reader_lock(reader->lock_); - RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; + RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id]; DCHECK(reader->Validate()) << endl << reader->DebugString(); - DCHECK_GT(state.num_threads_in_op(), 0); - DCHECK(buffer->buffer_ != nullptr); - - if (reader->state_ == RequestContext::Cancelled) { - state.DecrementRequestThreadAndCheckDone(reader); - DCHECK(reader->Validate()) << endl << reader->DebugString(); - if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get()); - buffer->buffer_ = nullptr; - ScanRange* scan_range = buffer->scan_range_; - scan_range->Cancel(reader->status_); - // Enqueue the buffer to use the scan range's buffer cleanup path. - scan_range->EnqueueBuffer(reader_lock, move(buffer)); - return; - } - - DCHECK_EQ(reader->state_, RequestContext::Active); + DCHECK_GT(disk_state->num_threads_in_op(), 0); DCHECK(buffer->buffer_ != nullptr); + DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code path."; - // Update the reader's scan ranges. There are a three cases here: - // 1. Read error - // 2. End of scan range - // 3. Middle of scan range - if (!buffer->status_.ok()) { - // Error case - if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get()); - buffer->buffer_ = nullptr; - buffer->eosr_ = true; - --state.num_remaining_ranges(); - buffer->scan_range_->Cancel(buffer->status_); - } else if (buffer->eosr_) { - --state.num_remaining_ranges(); - } - - // After calling EnqueueBuffer(), it is no longer valid to read from buffer. - // Store the state we need before calling EnqueueBuffer(). + // After calling EnqueueReadyBuffer() below, it is no longer valid to read from buffer. + // Store the state we need before calling EnqueueReadyBuffer(). bool eosr = buffer->eosr_; + + // TODO: IMPALA-4249: it safe to touch 'scan_range' until DecrementDiskThread() is + // called because all clients of DiskIoMgr keep ScanRange objects alive until they + // unregister their RequestContext. ScanRange* scan_range = buffer->scan_range_; - bool is_cached = buffer->is_cached(); - bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer)); - if (eosr) { - // For cached buffers, we can't close the range until the cached buffer is returned. - // Close() is called from DiskIoMgr::ReturnBuffer(). - if (!is_cached) scan_range->Close(); + bool scan_range_done = eosr; + if (read_status.ok() && reader->state_ != RequestContext::Cancelled) { + DCHECK_EQ(reader->state_, RequestContext::Active); + // Read successfully - update the reader's scan ranges. There are two cases here: + // 1. End of scan range or cancelled scan range - don't need to reschedule. + // 2. Middle of scan range - need to schedule to read next buffer. + bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer)); + if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range); } else { - if (queue_full) { - reader->blocked_ranges_.Enqueue(scan_range); - } else { - reader->ScheduleScanRange(scan_range); - } + // The scan range will be cancelled, either because we hit an error or because the + // request context was cancelled. The buffer is not needed - we must free it. + reader->FreeBuffer(buffer.get()); + // Propagate 'read_status' to the scan range. If we are here because the context + // was cancelled, the scan range is already cancelled so we do not need to re-cancel + // it. + if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, read_status); + scan_range_done = true; + } + if (scan_range_done) { + scan_range->Close(); + --disk_state->num_remaining_ranges(); } - state.DecrementRequestThread(); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + disk_state->DecrementDiskThread(reader_lock, reader); } void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { @@ -974,14 +700,12 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { // 3. Perform the read or write as specified. // Cancellation checking needs to happen in both steps 1 and 3. while (true) { - RequestContext* worker_context = nullptr;; + RequestContext* worker_context = nullptr; RequestRange* range = nullptr; - if (!GetNextRequestRange(disk_queue, &range, &worker_context)) { DCHECK(shut_down_); - break; + return; } - if (range->request_type() == RequestType::READ) { ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range)); } else { @@ -989,12 +713,8 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { Write(worker_context, static_cast<WriteRange*>(range)); } } - - DCHECK(shut_down_); } -// This function reads the specified scan range associated with the -// specified reader context and disk queue. void DiskIoMgr::ReadRange( DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) { int64_t bytes_remaining = range->len_ - range->bytes_read_; @@ -1002,86 +722,40 @@ void DiskIoMgr::ReadRange( unique_ptr<BufferDescriptor> buffer_desc; if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) { buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range, - range->client_buffer_.data, range->client_buffer_.len, nullptr)); + range->client_buffer_.data, range->client_buffer_.len)); } else { - // Need to allocate a buffer to read into. - int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_)); - buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size); - if (buffer_desc == nullptr) return; + DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) + << "This code path does not handle other buffer types, i.e. HDFS cache" + << static_cast<int>(range->external_buffer_tag_); + buffer_desc = range->GetNextUnusedBufferForRange(); + if (buffer_desc == nullptr) { + // No buffer available - the range will be rescheduled when a buffer is added. + unique_lock<mutex> reader_lock(reader->lock_); + reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, reader); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + return; + } } - reader->num_used_buffers_.Add(1); // No locks in this section. Only working on local vars. We don't want to hold a // lock across the read call. - buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled()); - if (buffer_desc->status_.ok()) { + Status read_status = range->Open(detail::is_file_handle_caching_enabled()); + if (read_status.ok()) { // Update counters. - if (reader->active_read_thread_counter_) { - reader->active_read_thread_counter_->Add(1L); - } - if (reader->disks_accessed_bitmap_) { - int64_t disk_bit = 1LL << disk_queue->disk_id; - reader->disks_accessed_bitmap_->BitOr(disk_bit); - } + COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L); + COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << disk_queue->disk_id); - buffer_desc->status_ = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_, + read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_, &buffer_desc->len_, &buffer_desc->eosr_); buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_; - if (reader->bytes_read_counter_ != nullptr) { - COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_); - } - + COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_); COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_); - if (reader->active_read_thread_counter_) { - reader->active_read_thread_counter_->Add(-1L); - } + COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L); } // Finished read, update reader/disk based on the results - HandleReadFinished(disk_queue, reader, move(buffer_desc)); -} - -unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange( - DiskQueue* disk_queue, RequestContext* reader, ScanRange* range, - int64_t buffer_size) { - DCHECK(reader->mem_tracker_ != nullptr); - bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY; - if (!enough_memory) { - // Low memory, GC all the buffers and try again. - GcIoBuffers(); - enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY; - } - - if (!enough_memory) { - RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; - unique_lock<mutex> reader_lock(reader->lock_); - - // Just grabbed the reader lock, check for cancellation. - if (reader->state_ == RequestContext::Cancelled) { - DCHECK(reader->Validate()) << endl << reader->DebugString(); - state.DecrementRequestThreadAndCheckDone(reader); - range->Cancel(reader->status_); - DCHECK(reader->Validate()) << endl << reader->DebugString(); - return nullptr; - } - - if (!range->ready_buffers_.empty()) { - // We have memory pressure and this range doesn't need another buffer - // (it already has one queued). Skip this range and pick it up later. - range->blocked_on_queue_ = true; - reader->blocked_ranges_.Enqueue(range); - state.DecrementRequestThread(); - return nullptr; - } else { - // We need to get a buffer anyway since there are none queued. The query - // is likely to fail due to mem limits but there's nothing we can do about that - // now. - } - } - unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, buffer_size); - DCHECK(buffer_desc != nullptr); - return buffer_desc; + HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc)); } void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) { @@ -1113,30 +787,14 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) { #endif RETURN_IF_ERROR(local_file_system_->Fwrite(file_handle, write_range)); - if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) { - ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_); - } - + ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_); return Status::OK(); } -int DiskIoMgr::free_buffers_idx(int64_t buffer_size) { - int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_); - int idx = BitUtil::Log2Ceiling64(buffer_size_scaled); - DCHECK_GE(idx, 0); - DCHECK_LT(idx, free_buffers_.size()); - return idx; -} - Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) { unique_lock<mutex> writer_lock(writer->lock_); - - if (writer->state_ == RequestContext::Cancelled) { - DCHECK(!writer->status_.ok()); - return writer->status_; - } - - writer->AddRequestRange(write_range, false); + if (writer->state_ == RequestContext::Cancelled) return Status::CANCELLED; + writer->AddRangeToDisk(writer_lock, write_range, ScheduleMode::IMMEDIATELY); return Status::OK(); }
http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h index 52d6993..17aa211 100644 --- a/be/src/runtime/io/disk-io-mgr.h +++ b/be/src/runtime/io/disk-io-mgr.h @@ -30,6 +30,7 @@ #include "common/hdfs.h" #include "common/object-pool.h" #include "common/status.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/io/handle-cache.h" #include "runtime/io/local-file-system.h" #include "runtime/io/request-ranges.h" @@ -42,21 +43,19 @@ namespace impala { -class MemTracker; - namespace io { /// Manager object that schedules IO for all queries on all disks and remote filesystems /// (such as S3). Each query maps to one or more RequestContext objects, each of which /// has its own queue of scan ranges and/or write ranges. -// +/// /// The API splits up requesting scan/write ranges (non-blocking) and reading the data /// (blocking). The DiskIoMgr has worker threads that will read from and write to /// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This allows us to /// keep all disks and all cores as busy as possible. -// +/// /// All public APIs are thread-safe. It is not valid to call any of the APIs after /// UnregisterContext() returns. -// +/// /// For Readers: /// We can model this problem as a multiple producer (threads for each disk), multiple /// consumer (scan ranges) problem. There are multiple queues that need to be @@ -68,84 +67,102 @@ namespace io { /// Readers map to scan nodes. The reader then contains a queue of scan ranges. The caller /// asks the IoMgr for the next range to process. The IoMgr then selects the best range /// to read based on disk activity and begins reading and queuing buffers for that range. -/// TODO: We should map readers to queries. A reader is the unit of scheduling and queries -/// that have multiple scan nodes shouldn't have more 'turns'. -// +/// /// For Writers: /// Data is written via AddWriteRange(). This is non-blocking and adds a WriteRange to a /// per-disk queue. After the write is complete, a callback in WriteRange is invoked. /// No memory is allocated within IoMgr for writes and no copies are made. It is the /// responsibility of the client to ensure that the data to be written is valid and that /// the file to be written to exists until the callback is invoked. -// -/// The IoMgr provides three key APIs. -/// 1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges that -/// will eventually need to be read. -/// 2. GetNextRange: returns to the caller the next scan range it should process. -/// This is based on disk load. This also begins reading the data in this scan -/// range. This is blocking. -/// 3. ScanRange::GetNext: returns the next buffer for this range. This is blocking. -// +/// +/// There are several key methods for scanning data with the IoMgr. +/// 1. StartScanRange(): adds range to the IoMgr to start immediately. +/// 2. AddScanRanges(): adds ranges to the IoMgr that the reader wants to scan, but does +/// not start them until GetNextUnstartedRange() is called. +/// 3. GetNextUnstartedRange(): returns to the caller the next scan range it should +/// process. +/// 4. ScanRange::GetNext(): returns the next buffer for this range, blocking until +/// data is available. +/// /// The disk threads do not synchronize with each other. The readers and writers don't /// synchronize with each other. There is a lock and condition variable for each request /// context queue and each disk queue. /// IMPORTANT: whenever both locks are needed, the lock order is to grab the context lock /// before the disk lock. -// +/// /// Scheduling: If there are multiple request contexts with work for a single disk, the /// request contexts are scheduled in round-robin order. Multiple disk threads can /// operate on the same request context. Exactly one request range is processed by a -/// disk thread at a time. If there are multiple scan ranges scheduled via -/// GetNextRange() for a single context, these are processed in round-robin order. +/// disk thread at a time. If there are multiple scan ranges scheduled for a single +/// context, these are processed in round-robin order. /// If there are multiple scan and write ranges for a disk, a read is always followed /// by a write, and a write is followed by a read, i.e. reads and writes alternate. /// If multiple write ranges are enqueued for a single disk, they will be processed /// by the disk threads in order, but may complete in any order. No guarantees are made /// on ordering of writes across disks. -// -/// Resource Management: effective resource management in the IoMgr is key to good -/// performance. The IoMgr helps coordinate two resources: CPU and disk. For CPU, -/// spinning up too many threads causes thrashing. -/// Memory usage in the IoMgr comes from queued read buffers. If we queue the minimum -/// (i.e. 1), then the disks are idle while we are processing the buffer. If we don't -/// limit the queue, then it possible we end up queueing the entire data set (i.e. CPU -/// is slower than disks) and run out of memory. -/// For both CPU and memory, we want to model the machine as having a fixed amount of -/// resources. If a single query is running, it should saturate either CPU or Disk -/// as well as using as little memory as possible. With multiple queries, each query -/// should get less CPU. In that case each query will need fewer queued buffers and -/// therefore have less memory usage. -// -/// The IoMgr defers CPU management to the caller. The IoMgr provides a GetNextRange -/// API which will return the next scan range the caller should process. The caller -/// can call this from the desired number of reading threads. Once a scan range -/// has been returned via GetNextRange, the IoMgr will start to buffer reads for -/// that range and it is expected the caller will pull those buffers promptly. For -/// example, if the caller would like to have 1 scanner thread, the read loop -/// would look like: +/// +/// Resource Management: the IoMgr is designed to share the available disk I/O capacity +/// between many clients and to help use the available I/O capacity efficiently. The IoMgr +/// interfaces are designed to let clients manage their own CPU and memory usage while the +/// IoMgr manages the allocation of the I/O capacity of different I/O devices to scan +/// ranges of different clients. +/// +/// IoMgr clients may want to work on multiple scan ranges at a time to maximize CPU and +/// I/O utilization. Clients can call GetNextUnstartedRange() to start as many concurrent +/// scan ranges as required, e.g. from each parallel scanner thread. Once a scan range has +/// been returned via GetNextUnstartedRange(), the caller must allocate any memory needed +/// for buffering reads, after which the IoMgr wil start to fill the buffers with data +/// while the caller concurrently consumes and processes the data. For example, the logic +/// in a scanner thread might look like: /// while (more_ranges) -/// range = GetNextRange() +/// range = GetNextUnstartedRange() /// while (!range.eosr) /// buffer = range.GetNext() -/// To have multiple reading threads, the caller would simply spin up the threads -/// and each would process the loops above. -// -/// To control the number of IO buffers, each scan range has a limit of two queued -/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity, -/// the IoMgr will no longer read for that scan range until the caller has processed -/// a buffer. Assuming the client returns each buffer before requesting the next one -/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per -/// scan range. -// +/// +/// Note that the IoMgr rather than the client is responsible for choosing which scan +/// range to process next, which allows optimizations like distributing load across disks. +/// /// Buffer Management: -/// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller, -/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided by the -/// caller when constructing the scan range. +/// Buffers for reads are either a) allocated on behalf of the caller with +/// AllocateBuffersForRange() ("IoMgr-allocated"), b) cached HDFS buffers if the scan +/// range was read from the HDFS cache, or c) a client buffer, large enough to fit the +/// whole scan range's data, that is provided by the caller when constructing the +/// scan range. +/// +/// All three kinds of buffers are wrapped in BufferDescriptors before returning to the +/// caller. The caller must always call ReturnBuffer() on the buffer descriptor to allow +/// recycling of the buffer memory and to release any resources associated with the buffer +/// or scan range. /// -/// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors -/// and returned to the caller. The caller must always call ReturnBuffer() on the buffer -/// descriptor to allow recycling of the associated buffer (if there is an -/// IoMgr-allocated or HDFS cached buffer). +/// In case a), ReturnBuffer() may re-enqueue the buffer for GetNext() to return again if +/// needed. E.g. if 24MB of buffers were allocated to read a 64MB scan range, each buffer +/// must be returned multiple times. Callers must be careful to call ReturnBuffer() with +/// the previous buffer returned from the range before calling before GetNext() so that +/// at least one buffer is available for the I/O mgr to read data into. Calling GetNext() +/// when the scan range has no buffers to read data into causes a resource deadlock. +/// NB: if the scan range was allocated N buffers, then it's always ok for the caller +/// to hold onto N - 1 buffers, but currently the IoMgr doesn't give the caller a way +/// to determine the value of N. +/// +/// If the caller wants to maximize I/O throughput, it can give the range enough memory +/// for 3 max-sized buffers per scan range. Having two queued buffers (plus the buffer +/// that is currently being processed by the client) gives good performance in most +/// scenarios: +/// 1. If the consumer is consuming data faster than we can read from disk, then the +/// queue will be empty most of the time because the buffer will be immediately +/// pulled off the queue as soon as it is added. There will always be an I/O request +/// in the disk queue to maximize I/O throughput, which is the bottleneck in this +/// case. +/// 2. If we can read from disk faster than the consumer is consuming data, the queue +/// will fill up and there will always be a buffer available for the consumer to +/// read, so the consumer will not block and we maximize consumer throughput, which +/// is the bottleneck in this case. +/// 3. If the consumer is consuming data at approximately the same rate as we are +/// reading from disk, then the steady state is that the consumer is processing one +/// buffer and one buffer is in the disk queue. The additional buffer can absorb +/// bursts where the producer runs faster than the consumer or the consumer runs +/// faster than the producer without blocking either the producer or consumer. +/// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE. /// /// Caching support: /// Scan ranges contain metadata on whether or not it is cached on the DN. In that @@ -161,13 +178,13 @@ namespace io { /// - HDFS will time us out if we hold onto the mlock for too long /// - Holding the lock prevents uncaching this file due to a caching policy change. /// Therefore, we only issue the cached read when the caller is ready to process the -/// range (GetNextRange()) instead of when the ranges are issued. This guarantees that -/// there will be a CPU available to process the buffer and any throttling we do with +/// range (GetNextUnstartedRange()) instead of when the ranges are issued. This guarantees +/// that there will be a CPU available to process the buffer and any throttling we do with /// the number of scanner threads properly controls the amount of files we mlock. /// With cached scan ranges, we cannot close the scan range until the cached buffer /// is returned (HDFS does not allow this). We therefore need to defer the close until /// the cached buffer is returned (ReturnBuffer()). -// +/// /// Remote filesystem support (e.g. S3): /// Remote filesystems are modeled as "remote disks". That is, there is a seperate disk /// queue for each supported remote filesystem type. In order to maximize throughput, @@ -176,12 +193,13 @@ namespace io { /// intensive than local disk/hdfs because of non-direct I/O and SSL processing, and can /// be CPU bottlenecked especially if not enough I/O threads for these queues are /// started. -// +/// +/// TODO: We should implement more sophisticated resource management. Currently readers +/// are the unit of scheduling and we attempt to distribute IOPS between them. Instead +/// it would be better to have policies based on queries, resource pools, etc. /// TODO: IoMgr should be able to request additional scan ranges from the coordinator /// to help deal with stragglers. -/// TODO: look into using a lock free queue -/// TODO: simplify the common path (less locking, memory allocations). -// +/// /// Structure of the Implementation: /// - All client APIs are defined in this file, request-ranges.h and request-context.h. /// Clients can include only the files that they need. @@ -204,10 +222,12 @@ class DiskIoMgr : public CacheLineAligned { /// disk. This is also the max queue depth. /// - threads_per_solid_state_disk: number of read threads to create per solid state /// disk. This is also the max queue depth. - /// - min_buffer_size: minimum io buffer size (in bytes) - /// - max_buffer_size: maximum io buffer size (in bytes). Also the max read size. + /// - min_buffer_size: minimum io buffer size (in bytes). Will be rounded down to the + // nearest power-of-two. + /// - max_buffer_size: maximum io buffer size (in bytes). Will be rounded up to the + /// nearest power-of-two. Also the max read size. DiskIoMgr(int num_disks, int threads_per_rotational_disk, - int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size); + int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size); /// Create DiskIoMgr with default configs. DiskIoMgr(); @@ -217,16 +237,13 @@ class DiskIoMgr : public CacheLineAligned { virtual ~DiskIoMgr(); /// Initialize the IoMgr. Must be called once before any of the other APIs. - Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT; + Status Init() WARN_UNUSED_RESULT; + /// Allocates tracking structure for a request context. /// Register a new request context and return it to the caller. The caller must call /// UnregisterContext() for each context. - /// reader_mem_tracker: Is non-null only for readers. IO buffers - /// used for this reader will be tracked by this. If the limit is exceeded - /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via - /// GetNext(). - std::unique_ptr<RequestContext> RegisterContext(MemTracker* reader_mem_tracker); + std::unique_ptr<RequestContext> RegisterContext(); /// Unregisters context from the disk IoMgr by first cancelling it then blocking until /// all references to the context are removed from I/O manager internal data structures. @@ -236,50 +253,60 @@ class DiskIoMgr : public CacheLineAligned { /// up. void UnregisterContext(RequestContext* context); - /// This function cancels the context asychronously. All outstanding requests - /// are aborted and tracking structures cleaned up. This does not need to be - /// called if the context finishes normally. - /// This will also fail any outstanding GetNext()/Read requests. - void CancelContext(RequestContext* context); - - /// Adds the scan ranges to the queues. This call is non-blocking. The caller must - /// not deallocate the scan range pointers before UnregisterContext(). - /// If schedule_immediately, the ranges are immediately put on the read queue - /// (i.e. the caller should not/cannot call GetNextRange for these ranges). - /// This can be used to do synchronous reads as well as schedule dependent ranges, - /// as in the case for columnar formats. - Status AddScanRanges(RequestContext* reader, - const std::vector<ScanRange*>& ranges, - bool schedule_immediately = false) WARN_UNUSED_RESULT; - Status AddScanRange(RequestContext* reader, ScanRange* range, - bool schedule_immediately = false) WARN_UNUSED_RESULT; + /// Adds the scan ranges to reader's queues, but does not start scheduling it. The range + /// can be scheduled by a thread calling GetNextUnstartedRange(). This call is + /// non-blocking. The caller must not deallocate the scan range pointers before + /// UnregisterContext(). 'ranges' must not be empty. + Status AddScanRanges( + RequestContext* reader, const std::vector<ScanRange*>& ranges) WARN_UNUSED_RESULT; + + /// Adds the scan range to the queues, as with AddScanRanges(), but immediately + /// start scheduling the scan range. This can be used to do synchronous reads as well + /// as schedule dependent ranges, e.g. for columnar formats. This call is non-blocking. + /// The caller must not deallocate the scan range pointers before UnregisterContext(). + /// + /// If this returns true in '*needs_buffers', the caller must then call + /// AllocateBuffersForRange() to add buffers for the data to be read into before the + /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr will + /// asynchronously read the data for the range and the caller can call + /// ScanRange::GetNext() to read the data. + Status StartScanRange( + RequestContext* reader, ScanRange* range, bool* needs_buffers) WARN_UNUSED_RESULT; /// Add a WriteRange for the writer. This is non-blocking and schedules the context /// on the IoMgr disk queue. Does not create any files. Status AddWriteRange( RequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT; - /// Returns the next unstarted scan range for this reader. When the range is returned, - /// the disk threads in the IoMgr will already have started reading from it. The - /// caller is expected to call ScanRange::GetNext on the returned range. - /// If there are no more unstarted ranges, nullptr is returned. - /// This call is blocking. - Status GetNextRange(RequestContext* reader, ScanRange** range) WARN_UNUSED_RESULT; - - /// Reads the range and returns the result in buffer. - /// This behaves like the typical synchronous read() api, blocking until the data - /// is read. This can be called while there are outstanding ScanRanges and is - /// thread safe. Multiple threads can be calling Read() per reader at a time. - /// range *cannot* have already been added via AddScanRanges. - /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller - /// than max_read_buffer_size()) or if reading into a client-provided buffer. - Status Read(RequestContext* reader, ScanRange* range, - std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; - - /// Returns the buffer to the IoMgr. This must be called for every buffer - /// returned by GetNext()/Read() that did not return an error. This is non-blocking. - /// After calling this, the buffer descriptor is invalid and cannot be accessed. - void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); + /// Tries to get an unstarted scan range that was added to 'reader' with + /// AddScanRanges(). On success, returns OK and returns the range in '*range'. + /// If 'reader' was cancelled, returns CANCELLED. If another error is encountered, + /// an error status is returned. Otherwise, if error or cancellation wasn't encountered + /// and there are no unstarted ranges for 'reader', returns OK and sets '*range' to + /// nullptr. + /// + /// If '*needs_buffers' is returned as true, the caller must call + /// AllocateBuffersForRange() to add buffers for the data to be read into before the + /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr will + /// asynchronously read the data for the range and the caller can call + /// ScanRange::GetNext() to read the data. + Status GetNextUnstartedRange(RequestContext* reader, ScanRange** range, + bool* needs_buffers) WARN_UNUSED_RESULT; + + /// Allocates up to 'max_bytes' buffers to read the data from 'range' into and schedules + /// the range. Called after StartScanRange() or GetNextUnstartedRange() returns + /// *needs_buffers=true. + /// + /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >= + /// min_read_buffer_size() so that at least one buffer can be allocated. The caller + /// must ensure that 'bp_client' has at least 'max_bytes' unused reservation. Returns ok + /// if the buffers were successfully allocated and the range was scheduled. + /// + /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size() + /// will typically maximize I/O throughput. See the "Buffer Management" section of + /// the class comment for explanation. + Status AllocateBuffersForRange(RequestContext* reader, + BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes); /// Determine which disk queue this file should be assigned to. Returns an index into /// disk_queues_. The disk_id is the volume ID for the local disk that holds the @@ -287,32 +314,8 @@ class DiskIoMgr : public CacheLineAligned { /// co-located with the datanode for this file. int AssignQueue(const char* file, int disk_id, bool expected_local); - /// TODO: The functions below can be moved to RequestContext. - /// Returns the current status of the context. - Status context_status(RequestContext* context) const WARN_UNUSED_RESULT; - - void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*); - void set_read_timer(RequestContext*, RuntimeProfile::Counter*); - void set_open_file_timer(RequestContext*, RuntimeProfile::Counter*); - void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*); - void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*); - - int64_t queue_size(RequestContext* reader) const; - int64_t bytes_read_local(RequestContext* reader) const; - int64_t bytes_read_short_circuit(RequestContext* reader) const; - int64_t bytes_read_dn_cache(RequestContext* reader) const; - int num_remote_ranges(RequestContext* reader) const; - int64_t unexpected_remote_bytes(RequestContext* reader) const; - int cached_file_handles_hit_count(RequestContext* reader) const; - int cached_file_handles_miss_count(RequestContext* reader) const; - - /// Returns the read throughput across all readers. - /// TODO: should this be a sliding window? This should report metrics for the - /// last minute, hour and since the beginning. - int64_t GetReadThroughput(); - - /// Returns the maximum read buffer size - int max_read_buffer_size() const { return max_buffer_size_; } + int64_t min_buffer_size() const { return min_buffer_size_; } + int64_t max_buffer_size() const { return max_buffer_size_; } /// Returns the total number of disk queues (both local and remote). int num_total_disks() const { return disk_queues_.size(); } @@ -365,10 +368,6 @@ class DiskIoMgr : public CacheLineAligned { Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, RequestContext* reader, CachedHdfsFileHandle** fid); - /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if - /// 'bytes_to_free' is -1. - void GcIoBuffers(int64_t bytes_to_free = -1); - // Function to change the underlying LocalFileSystem object used for disk I/O. // DiskIoMgr will also take responsibility of the received LocalFileSystem pointer. // It is only for testing purposes to use a fault injected version of LocalFileSystem. @@ -376,25 +375,6 @@ class DiskIoMgr : public CacheLineAligned { local_file_system_ = std::move(fs); } - /// The maximum number of ready buffers that can be queued in a scan range. Having two - /// queued buffers (plus the buffer that is returned to the client) gives good - /// performance in most scenarios: - /// 1. If the consumer is consuming data faster than we can read from disk, then the - /// queue will be empty most of the time because the buffer will be immediately - /// pulled off the queue as soon as it is added. There will always be an I/O request - /// in the disk queue to maximize I/O throughput, which is the bottleneck in this - /// case. - /// 2. If we can read from disk faster than the consumer is consuming data, the queue - /// will fill up and there will always be a buffer available for the consumer to - /// read, so the consumer will not block and we maximize consumer throughput, which - /// is the bottleneck in this case. - /// 3. If the consumer is consuming data at approximately the same rate as we are - /// reading from disk, then the steady state is that the consumer is processing one - /// buffer and one buffer is in the disk queue. The additional buffer can absorb - /// bursts where the producer runs faster than the consumer or the consumer runs - /// faster than the producer without blocking either the producer or consumer. - static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2; - /// "Disk" queue offsets for remote accesses. Offset 0 corresponds to /// disk ID (i.e. disk_queue_ index) of num_local_disks(). enum { @@ -404,6 +384,10 @@ class DiskIoMgr : public CacheLineAligned { REMOTE_NUM_DISKS }; + /// The ideal number of max-sized buffers per scan range to maximise throughput. + /// See "Buffer Management" in the class comment for explanation. + static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3; + private: friend class BufferDescriptor; friend class RequestContext; @@ -412,16 +396,9 @@ class DiskIoMgr : public CacheLineAligned { struct DiskQueue; friend class DiskIoMgrTest_Buffers_Test; + friend class DiskIoMgrTest_BufferSizeSelection_Test; friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test; - /// Memory tracker for unused I/O buffers owned by DiskIoMgr. - boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_; - - /// Memory tracker for I/O buffers where the RequestContext has no MemTracker. - /// TODO: once IMPALA-3200 is fixed, there should be no more cases where readers don't - /// provide a MemTracker. - boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_; - // Handles the low level I/O functionality. std::unique_ptr<LocalFileSystem> local_file_system_; @@ -434,10 +411,10 @@ class DiskIoMgr : public CacheLineAligned { const int num_io_threads_per_solid_state_disk_; /// Maximum read size. This is also the maximum size of each allocated buffer. - const int max_buffer_size_; + const int64_t max_buffer_size_; - /// The minimum size of each read buffer. - const int min_buffer_size_; + /// The minimum size of each read buffer. Must be >= BufferPool::min_buffer_len(). + const int64_t min_buffer_size_; /// Thread group containing all the worker threads. ThreadGroup disk_thread_group_; @@ -455,28 +432,6 @@ class DiskIoMgr : public CacheLineAligned { /// Total time spent in hdfs reading RuntimeProfile::Counter read_timer_; - /// Protects free_buffers_ - boost::mutex free_buffers_lock_; - - /// Free buffers that can be handed out to clients. There is one list for each buffer - /// size, indexed by the Log2 of the buffer size in units of min_buffer_size_. The - /// maximum buffer size is max_buffer_size_, so the maximum index is - /// Log2(max_buffer_size_ / min_buffer_size_). - // - /// E.g. if min_buffer_size_ = 1024 bytes: - /// free_buffers_[0] => list of free buffers with size 1024 B - /// free_buffers_[1] => list of free buffers with size 2048 B - /// free_buffers_[10] => list of free buffers with size 1 MB - /// free_buffers_[13] => list of free buffers with size 8 MB - /// free_buffers_[n] => list of free buffers with size 2^n * 1024 B - std::vector<std::deque<uint8_t*>> free_buffers_; - - /// Total number of allocated buffers, used for debugging. - AtomicInt32 num_allocated_buffers_; - - /// Total number of buffers in readers - AtomicInt32 num_buffers_in_readers_; - /// Per disk queues. This is static and created once at Init() time. One queue is /// allocated for each local disk on the system and for each remote filesystem type. /// It is indexed by disk id. @@ -492,23 +447,6 @@ class DiskIoMgr : public CacheLineAligned { // handles are closed. FileHandleCache file_handle_cache_; - /// Returns the index into free_buffers_ for a given buffer size - int free_buffers_idx(int64_t buffer_size); - - /// Returns a buffer to read into with size between 'buffer_size' and - /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the - /// 'free_buffers_', that is returned, otherwise a new one is allocated. - /// The returned *buffer_size must be between 0 and 'max_buffer_size_'. - /// The buffer memory is tracked against reader's mem tracker, or - /// 'unowned_buffer_mem_tracker_' if the reader does not have one. - std::unique_ptr<BufferDescriptor> GetFreeBuffer( - RequestContext* reader, ScanRange* range, int64_t buffer_size); - - /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either - /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to - /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr. - void FreeBufferMemory(BufferDescriptor* desc); - /// Disk worker thread loop. This function retrieves the next range to process on /// the disk queue and invokes ReadRange() or Write() depending on the type of Range(). /// There can be multiple threads per disk running this loop. @@ -522,10 +460,12 @@ class DiskIoMgr : public CacheLineAligned { bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, RequestContext** request_context); - /// Updates disk queue and reader state after a read is complete. The read result - /// is captured in the buffer descriptor. + /// Updates disk queue and reader state after a read is complete. If the read + /// was successful, 'read_status' is ok and 'buffer' contains the result of the + /// read. If the read failed with an error, 'read_status' contains the error and + /// 'buffer' has the buffer that was meant to hold the result of the read. void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, - std::unique_ptr<BufferDescriptor> buffer); + Status read_status, std::unique_ptr<BufferDescriptor> buffer); /// Invokes write_range->callback_ after the range has been written and /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR @@ -548,15 +488,14 @@ class DiskIoMgr : public CacheLineAligned { /// Does not open or close the file that is written. Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT; - /// Reads the specified scan range and calls HandleReadFinished when done. + /// Reads the specified scan range and calls HandleReadFinished() when done. If no + /// buffer is available to read the range's data into, the read cannot proceed, the + /// range becomes blocked and this function returns without doing I/O. void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range); - /// Try to allocate the next buffer for the scan range, returning the new buffer - /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr. - /// If there is memory pressure and buffers are already queued, adds the range - /// to the blocked ranges and returns nullptr. - std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* disk_queue, - RequestContext* reader, ScanRange* range, int64_t buffer_size); + /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range + /// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated. + std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes); }; } }
