http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 0d2afe2..8ff6609 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/io/disk-io-mgr.h"
-
 #include "common/global-flags.h"
-#include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/handle-cache.inline.h"
 
@@ -54,8 +52,6 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads 
per disk");
 static const int THREADS_PER_ROTATIONAL_DISK = 1;
 static const int THREADS_PER_SOLID_STATE_DISK = 8;
 
-const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE;
-
 // The maximum number of the threads per rotational disk is also the max queue 
depth per
 // rotational disk.
 static const string num_io_threads_per_rotational_disk_help_msg = 
Substitute("Number of "
@@ -126,6 +122,13 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, 
"Maximum time, in seconds,
 DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used 
by the "
     "file handle cache.");
 
+// The IoMgr is able to run with a wide range of memory usage. If a query has 
memory
+// remaining less than this value, the IoMgr will stop all buffering 
regardless of the
+// current queue size.
+static const int LOW_MEMORY = 64 * 1024 * 1024;
+
+const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
+
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
 namespace detail {
@@ -152,6 +155,34 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len, MemTracker* mem_tracker)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    mem_tracker_(mem_tracker),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+  DCHECK_NE(scan_range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER,
+      mem_tracker == nullptr);
+}
+
+void BufferDescriptor::TransferOwnership(MemTracker* dst) {
+  DCHECK(dst != nullptr);
+  DCHECK(!is_client_buffer());
+  // Memory of cached buffers is not tracked against a tracker.
+  if (is_cached()) return;
+  DCHECK(mem_tracker_ != nullptr);
+  dst->Consume(buffer_len_);
+  mem_tracker_->Release(buffer_len_);
+  mem_tracker_ = dst;
+}
+
 WriteRange::WriteRange(
     const string& file, int64_t file_offset, int disk_id, WriteDoneCallback 
callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
@@ -192,8 +223,8 @@ DiskIoMgr::DiskIoMgr() :
     num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
         FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
         THREADS_PER_SOLID_STATE_DISK)),
-    max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)),
-    min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)),
+    max_buffer_size_(FLAGS_read_size),
+    min_buffer_size_(FLAGS_min_buffer_size),
     shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
@@ -202,6 +233,8 @@ DiskIoMgr::DiskIoMgr() :
         FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
   DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
+  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
+  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = DiskInfo::num_disks();
   if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
     LOG(WARNING) << "Number of disks specified should be between 0 and the 
number of "
@@ -215,11 +248,11 @@ DiskIoMgr::DiskIoMgr() :
 }
 
 DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
-    int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size) :
+    int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size) :
     num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
     num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
-    max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)),
-    min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)),
+    max_buffer_size_(max_buffer_size),
+    min_buffer_size_(min_buffer_size),
     shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
@@ -227,6 +260,8 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int 
threads_per_rotational_disk,
         FileSystemUtil::MaxNumFileHandles()),
         FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
+  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
+  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
@@ -250,22 +285,37 @@ DiskIoMgr::~DiskIoMgr() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     if (disk_queues_[i] == nullptr) continue;
     int disk_id = disk_queues_[i]->disk_id;
-    for (RequestContext* context : disk_queues_[i]->request_contexts) {
-      unique_lock<mutex> context_lock(context->lock_);
-      DCHECK_EQ(context->disk_states_[disk_id].num_threads_in_op(), 0);
-      DCHECK(context->disk_states_[disk_id].done());
-      context->DecrementDiskRefCount(context_lock);
+    for (list<RequestContext*>::iterator it = 
disk_queues_[i]->request_contexts.begin();
+        it != disk_queues_[i]->request_contexts.end(); ++it) {
+      DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
+      DCHECK((*it)->disk_states_[disk_id].done());
+      (*it)->DecrementDiskRefCount();
     }
   }
 
+  DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
+
+  // Delete all allocated buffers
+  int num_free_buffers = 0;
+  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
+    num_free_buffers += free_buffers_[idx].size();
+  }
+  DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
+  GcIoBuffers();
+
   for (int i = 0; i < disk_queues_.size(); ++i) {
     delete disk_queues_[i];
   }
 
+  if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close();
   if (cached_read_options_ != nullptr) 
hadoopRzOptionsFree(cached_read_options_);
 }
 
-Status DiskIoMgr::Init() {
+Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
+  DCHECK(process_mem_tracker != nullptr);
+  free_buffer_mem_tracker_.reset(
+      new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
+
   for (int i = 0; i < disk_queues_.size(); ++i) {
     disk_queues_[i] = new DiskQueue(i);
     int num_threads_per_disk;
@@ -311,14 +361,101 @@ Status DiskIoMgr::Init() {
   return Status::OK();
 }
 
-unique_ptr<RequestContext> DiskIoMgr::RegisterContext() {
-  return unique_ptr<RequestContext>(new RequestContext(this, 
num_total_disks()));
+unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) 
{
+  return unique_ptr<RequestContext>(
+      new RequestContext(this, num_total_disks(), mem_tracker));
 }
 
 void DiskIoMgr::UnregisterContext(RequestContext* reader) {
   reader->CancelAndMarkInactive();
 }
 
+// Cancellation requires coordination from multiple threads.  Each thread that 
currently
+// has a reference to the request context must notice the cancel and remove it 
from its
+// tracking structures.  The last thread to touch the context should 
deallocate (aka
+// recycle) the request context object.  Potential threads are:
+//  1. Disk threads that are currently reading for this reader.
+//  2. Caller threads that are waiting in GetNext.
+//
+// The steps are:
+// 1. Cancel will immediately set the context in the Cancelled state.  This 
prevents any
+// other thread from adding more ready buffers to the context (they all take a 
lock and
+// check the state before doing so), or any write ranges to the context.
+// 2. Cancel will call cancel on each ScanRange that is not yet complete, 
unblocking
+// any threads in GetNext(). The reader will see the cancelled Status 
returned. Cancel
+// also invokes the callback for the WriteRanges with the cancelled state.
+// 3. Disk threads notice the context is cancelled either when picking the 
next context
+// to process or when they try to enqueue a ready buffer.  Upon noticing the 
cancelled
+// state, removes the context from the disk queue.  The last thread per disk 
with an
+// outstanding reference to the context decrements the number of disk queues 
the context
+// is on.
+void DiskIoMgr::CancelContext(RequestContext* context) {
+  context->Cancel(Status::CANCELLED);
+}
+
+void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
+  r->read_timer_ = c;
+}
+
+void DiskIoMgr::set_open_file_timer(RequestContext* r, 
RuntimeProfile::Counter* c) {
+  r->open_file_timer_ = c;
+}
+
+void DiskIoMgr::set_bytes_read_counter(RequestContext* r, 
RuntimeProfile::Counter* c) {
+  r->bytes_read_counter_ = c;
+}
+
+void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
+    RuntimeProfile::Counter* c) {
+  r->active_read_thread_counter_ = c;
+}
+
+void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
+    RuntimeProfile::Counter* c) {
+  r->disks_accessed_bitmap_ = c;
+}
+
+int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
+  return reader->num_ready_buffers_.Load();
+}
+
+Status DiskIoMgr::context_status(RequestContext* context) const {
+  unique_lock<mutex> lock(context->lock_);
+  return context->status_;
+}
+
+int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
+  return reader->bytes_read_local_.Load();
+}
+
+int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
+  return reader->bytes_read_short_circuit_.Load();
+}
+
+int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
+  return reader->bytes_read_dn_cache_.Load();
+}
+
+int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
+  return reader->num_remote_ranges_.Load();
+}
+
+int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
+  return reader->unexpected_remote_bytes_.Load();
+}
+
+int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const {
+  return reader->cached_file_handles_hit_count_.Load();
+}
+
+int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const {
+  return reader->cached_file_handles_miss_count_.Load();
+}
+
+int64_t DiskIoMgr::GetReadThroughput() {
+  return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, 
&read_timer_);
+}
+
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
@@ -329,91 +466,84 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
     return Status(TErrorCode::DISK_IO_ERROR,
         Substitute("Invalid scan range. Negative offset $0", range->offset_));
   }
-  if (range->len_ <= 0) {
+  if (range->len_ < 0) {
     return Status(TErrorCode::DISK_IO_ERROR,
-        Substitute("Invalid scan range. Non-positive length $0", range->len_));
+        Substitute("Invalid scan range. Negative length $0", range->len_));
   }
   return Status::OK();
 }
 
-Status DiskIoMgr::AddScanRanges(
-    RequestContext* reader, const vector<ScanRange*>& ranges) {
-  DCHECK_GT(ranges.size(), 0);
+Status DiskIoMgr::AddScanRanges(RequestContext* reader,
+    const vector<ScanRange*>& ranges, bool schedule_immediately) {
+  if (ranges.empty()) return Status::OK();
+
   // Validate and initialize all ranges
   for (int i = 0; i < ranges.size(); ++i) {
     RETURN_IF_ERROR(ValidateScanRange(ranges[i]));
     ranges[i]->InitInternal(this, reader);
   }
 
+  // disks that this reader needs to be scheduled on.
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
-  if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+  if (reader->state_ == RequestContext::Cancelled) {
+    DCHECK(!reader->status_.ok());
+    return reader->status_;
+  }
 
   // Add each range to the queue of the disk the range is on
-  for (ScanRange* range : ranges) {
+  for (int i = 0; i < ranges.size(); ++i) {
     // Don't add empty ranges.
-    DCHECK_NE(range->len(), 0);
-    reader->AddActiveScanRangeLocked(reader_lock, range);
+    DCHECK_NE(ranges[i]->len(), 0);
+    ScanRange* range = ranges[i];
+
     if (range->try_cache_) {
-      reader->cached_ranges_.Enqueue(range);
-    } else {
-      reader->AddRangeToDisk(reader_lock, range, ScheduleMode::UPON_GETNEXT);
+      if (schedule_immediately) {
+        bool cached_read_succeeded;
+        RETURN_IF_ERROR(range->ReadFromCache(reader_lock, 
&cached_read_succeeded));
+        if (cached_read_succeeded) continue;
+        // Cached read failed, fall back to AddRequestRange() below.
+      } else {
+        reader->cached_ranges_.Enqueue(range);
+        continue;
+      }
     }
+    reader->AddRequestRange(range, schedule_immediately);
   }
   DCHECK(reader->Validate()) << endl << reader->DebugString();
+
   return Status::OK();
 }
 
-Status DiskIoMgr::StartScanRange(RequestContext* reader, ScanRange* range,
-    bool* needs_buffers) {
-  RETURN_IF_ERROR(ValidateScanRange(range));
-  range->InitInternal(this, reader);
-
-  unique_lock<mutex> reader_lock(reader->lock_);
-  DCHECK(reader->Validate()) << endl << reader->DebugString();
-  if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
-
-  DCHECK_NE(range->len(), 0);
-  if (range->try_cache_) {
-    bool cached_read_succeeded;
-    RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
-    if (cached_read_succeeded) {
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      *needs_buffers = false;
-      return Status::OK();
-    }
-    // Cached read failed, fall back to normal read path.
-  }
-  // If we don't have a buffer yet, the caller must allocate buffers for the 
range.
-  *needs_buffers = range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER;
-  if (*needs_buffers) range->SetBlockedOnBuffer();
-  reader->AddActiveScanRangeLocked(reader_lock, range);
-  reader->AddRangeToDisk(reader_lock, range,
-      *needs_buffers ? ScheduleMode::BY_CALLER : ScheduleMode::IMMEDIATELY);
-  DCHECK(reader->Validate()) << endl << reader->DebugString();
-  return Status::OK();
+Status DiskIoMgr::AddScanRange(
+    RequestContext* reader, ScanRange* range, bool schedule_immediately) {
+  return AddScanRanges(reader, vector<ScanRange*>({range}), 
schedule_immediately);
 }
 
 // This function returns the next scan range the reader should work on, 
checking
 // for eos and error cases. If there isn't already a cached scan range or a 
scan
 // range prepared by the disk threads, the caller waits on the disk threads.
-Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** 
range,
-    bool* needs_buffers) {
+Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
   DCHECK(reader != nullptr);
   DCHECK(range != nullptr);
   *range = nullptr;
-  *needs_buffers = false;
+  Status status = Status::OK();
 
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
+
   while (true) {
-    if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+    if (reader->state_ == RequestContext::Cancelled) {
+      DCHECK(!reader->status_.ok());
+      status = reader->status_;
+      break;
+    }
 
     if (reader->num_unstarted_scan_ranges_.Load() == 0 &&
         reader->ready_to_start_ranges_.empty() && 
reader->cached_ranges_.empty()) {
       // All ranges are done, just return.
-      return Status::OK();
+      break;
     }
 
     if (!reader->cached_ranges_.empty()) {
@@ -425,7 +555,7 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* 
reader, ScanRange** rang
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new 
range.
-      reader->AddRangeToDisk(reader_lock, *range, ScheduleMode::UPON_GETNEXT);
+      reader->AddRequestRange(*range, false);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       *range = nullptr;
       continue;
@@ -441,72 +571,183 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* 
reader, ScanRange** rang
       // Set this to nullptr, the next time this disk runs for this reader, it 
will
       // get another range ready.
       reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
-      ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag_;
-      if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) {
-        // We can't schedule this range until the client gives us buffers. The 
context
-        // must be rescheduled regardless to ensure that 
'next_scan_range_to_start' is
-        // refilled.
-        reader->disk_states_[disk_id].ScheduleContext(reader_lock, reader, 
disk_id);
-        (*range)->SetBlockedOnBuffer();
-        *needs_buffers = true;
-      } else {
-        reader->ScheduleScanRange(reader_lock, *range);
-      }
-      return Status::OK();
+      reader->ScheduleScanRange(*range);
+      break;
     }
   }
+  return status;
 }
 
-Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader,
-    BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) {
-  DCHECK_GE(max_bytes, min_buffer_size_);
-  DCHECK(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER)
-     << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate 
buffers "
-     << "when already reading into an external buffer";
-  BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
-  Status status;
-  vector<unique_ptr<BufferDescriptor>> buffers;
-  for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
-    BufferPool::BufferHandle handle;
-    status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
-    if (!status.ok()) goto error;
-    buffers.emplace_back(new BufferDescriptor(
-        this, reader, range, bp_client, move(handle)));
+Status DiskIoMgr::Read(RequestContext* reader,
+    ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) {
+  DCHECK(range != nullptr);
+  DCHECK(buffer != nullptr);
+  *buffer = nullptr;
+
+  if (range->len() > max_buffer_size_
+      && range->external_buffer_tag_ != 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: 
cannot "
+        "perform sync read of '$0' bytes that is larger than the max read 
buffer size "
+        "'$1'.", range->len(), max_buffer_size_));
   }
-  range->AddUnusedBuffers(move(buffers), false);
+
+  vector<ScanRange*> ranges;
+  ranges.push_back(range);
+  RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
+  RETURN_IF_ERROR(range->GetNext(buffer));
+  DCHECK((*buffer) != nullptr);
+  DCHECK((*buffer)->eosr());
   return Status::OK();
- error:
-  DCHECK(!status.ok());
-  range->CleanUpBuffers(move(buffers));
-  return status;
 }
 
-vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t 
max_bytes) {
-  DCHECK_GE(max_bytes, min_buffer_size_);
-  vector<int64_t> buffer_sizes;
-  int64_t bytes_allocated = 0;
-  while (bytes_allocated < scan_range_len) {
-    int64_t bytes_remaining = scan_range_len - bytes_allocated;
-    // Either allocate a max-sized buffer or a smaller buffer to fit the rest 
of the
-    // range.
-    int64_t next_buffer_size;
-    if (bytes_remaining >= max_buffer_size_) {
-      next_buffer_size = max_buffer_size_;
+void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+  DCHECK(buffer_desc != nullptr);
+  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
+
+  RequestContext* reader = buffer_desc->reader_;
+  if (buffer_desc->buffer_ != nullptr) {
+    if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
+      // Buffers the were not allocated by DiskIoMgr don't need to be freed.
+      FreeBufferMemory(buffer_desc.get());
+    }
+    buffer_desc->buffer_ = nullptr;
+    num_buffers_in_readers_.Add(-1);
+    reader->num_buffers_in_reader_.Add(-1);
+  } else {
+    // A nullptr buffer means there was an error in which case there is no 
buffer
+    // to return.
+  }
+
+  if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
+    // Need to close the scan range if returning the last buffer or the scan 
range
+    // has been cancelled (and the caller might never get the last buffer).
+    // Close() is idempotent so multiple cancelled buffers is okay.
+    buffer_desc->scan_range_->Close();
+  }
+}
+
+unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer(
+    RequestContext* reader, ScanRange* range, int64_t buffer_size) {
+  DCHECK_LE(buffer_size, max_buffer_size_);
+  DCHECK_GT(buffer_size, 0);
+  buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
+  int idx = free_buffers_idx(buffer_size);
+  // Quantize buffer size to nearest power of 2 greater than the specified 
buffer size and
+  // convert to bytes
+  buffer_size = (1LL << idx) * min_buffer_size_;
+
+  // Track memory against the reader. This is checked the next time we start
+  // a read for the next reader in DiskIoMgr::GetNextScanRange().
+  DCHECK(reader->mem_tracker_ != nullptr);
+  reader->mem_tracker_->Consume(buffer_size);
+
+  uint8_t* buffer = nullptr;
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (free_buffers_[idx].empty()) {
+      num_allocated_buffers_.Add(1);
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
+      }
+      // We already tracked this memory against the reader's MemTracker.
+      buffer = new uint8_t[buffer_size];
     } else {
-      next_buffer_size =
-          max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining));
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
+      }
+      buffer = free_buffers_[idx].front();
+      free_buffers_[idx].pop_front();
+      free_buffer_mem_tracker_->Release(buffer_size);
+      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
     }
-    if (next_buffer_size + bytes_allocated > max_bytes) {
-      // Can't allocate the desired buffer size. Make sure to allocate at 
least one
-      // buffer.
-      if (bytes_allocated > 0) break;
-      next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes);
+  }
+
+  // Validate more invariants.
+  DCHECK(range != nullptr);
+  DCHECK(reader != nullptr);
+  DCHECK(buffer != nullptr);
+  return unique_ptr<BufferDescriptor>(new BufferDescriptor(
+      this, reader, range, buffer, buffer_size, reader->mem_tracker_));
+}
+
+void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
+  unique_lock<mutex> lock(free_buffers_lock_);
+  int buffers_freed = 0;
+  int bytes_freed = 0;
+  // Free small-to-large to avoid retaining many small buffers and fragmenting 
memory.
+  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
+    deque<uint8_t*>* free_buffers = &free_buffers_[idx];
+    while (
+        !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= 
bytes_to_free)) {
+      uint8_t* buffer = free_buffers->front();
+      free_buffers->pop_front();
+      int64_t buffer_size = (1LL << idx) * min_buffer_size_;
+      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
+      delete[] buffer;
+      free_buffer_mem_tracker_->Release(buffer_size);
+      num_allocated_buffers_.Add(-1);
+
+      ++buffers_freed;
+      bytes_freed += buffer_size;
+    }
+    if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
+  }
+
+  if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
+  }
+  if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
+    ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
+  }
+  if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
+  }
+}
+
+void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
+  DCHECK(!desc->is_cached());
+  DCHECK(!desc->is_client_buffer());
+  uint8_t* buffer = desc->buffer_;
+  int64_t buffer_size = desc->buffer_len_;
+  int idx = free_buffers_idx(buffer_size);
+  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
+      << "buffer_size_ / min_buffer_size_ should be power of 2, got 
buffer_size = "
+      << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
+
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (!FLAGS_disable_mem_pools &&
+        free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
+      // Poison buffers stored in cache.
+      ASAN_POISON_MEMORY_REGION(buffer, buffer_size);
+      free_buffers_[idx].push_back(buffer);
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
+      }
+      // This consume call needs to be protected by 'free_buffers_lock_' to 
avoid a race
+      // with a Release() call for the same buffer that could make consumption 
negative.
+      // Note: we can't use TryConsume(), which can indirectly call 
GcIoBuffers().
+      // TODO: after IMPALA-3200 is completed, we should be able to leverage 
the buffer
+      // pool's free lists, and remove these free lists.
+      free_buffer_mem_tracker_->Consume(buffer_size);
+    } else {
+      num_allocated_buffers_.Add(-1);
+      delete[] buffer;
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
+      }
     }
-    DCHECK(BitUtil::IsPowerOf2(next_buffer_size)) << next_buffer_size;
-    buffer_sizes.push_back(next_buffer_size);
-    bytes_allocated += next_buffer_size;
   }
-  return buffer_sizes;
+
+  // We transferred the buffer ownership from the BufferDescriptor to the 
DiskIoMgr.
+  desc->mem_tracker_->Release(buffer_size);
+  desc->buffer_ = nullptr;
 }
 
 // This function gets the next RequestRange to work on for this disk. It 
checks for
@@ -546,7 +787,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
       disk_queue->request_contexts.pop_front();
       DCHECK(*request_context != nullptr);
       request_disk_state = &((*request_context)->disk_states_[disk_id]);
-      request_disk_state->IncrementDiskThreadAndDequeue();
+      request_disk_state->IncrementRequestThreadAndDequeue();
     }
 
     // NOTE: no locks were taken in between.  We need to be careful about what 
state
@@ -554,13 +795,27 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* 
disk_queue, RequestRange** range,
     // There are some invariants here.  Only one disk thread can have the
     // same reader here (the reader is removed from the queue).  There can be
     // other disk threads operating on this reader in other functions though.
+
+    // We just picked a reader. Before we may allocate a buffer on its behalf, 
check that
+    // it has not exceeded any memory limits (e.g. the query or process limit).
+    // TODO: once IMPALA-3200 is fixed, we should be able to remove the free 
lists and
+    // move these memory limit checks to GetFreeBuffer().
+    // Note that calling AnyLimitExceeded() can result in a call to 
GcIoBuffers().
+    // TODO: IMPALA-3209: we should not force a reader over its memory limit by
+    // pushing more buffers to it. Most readers can make progress and operate 
within
+    // a fixed memory limit.
+    if ((*request_context)->mem_tracker_ != nullptr
+        && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
+      (*request_context)->Cancel(Status::MemLimitExceeded());
+    }
+
     unique_lock<mutex> request_lock((*request_context)->lock_);
     VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
         << (*request_context)->DebugString();
 
     // Check if reader has been cancelled
     if ((*request_context)->state_ == RequestContext::Cancelled) {
-      request_disk_state->DecrementDiskThread(request_lock, *request_context);
+      request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
       continue;
     }
 
@@ -571,16 +826,16 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* 
disk_queue, RequestRange** range,
         !request_disk_state->unstarted_scan_ranges()->empty()) {
       // We don't have a range queued for this disk for what the caller should
       // read next. Populate that.  We want to have one range waiting to 
minimize
-      // wait time in GetNextUnstartedRange().
+      // wait time in GetNextRange.
       ScanRange* new_range = 
request_disk_state->unstarted_scan_ranges()->Dequeue();
       (*request_context)->num_unstarted_scan_ranges_.Add(-1);
       (*request_context)->ready_to_start_ranges_.Enqueue(new_range);
       request_disk_state->set_next_scan_range_to_start(new_range);
 
       if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
-        // All the ranges have been started, notify everyone blocked on
-        // GetNextUnstartedRange(). Only one of them will get work so make 
sure to return
-        // nullptr to the other caller threads.
+        // All the ranges have been started, notify everyone blocked on 
GetNextRange.
+        // Only one of them will get work so make sure to return nullptr to 
the other
+        // caller threads.
         (*request_context)->ready_to_start_ranges_cv_.NotifyAll();
       } else {
         (*request_context)->ready_to_start_ranges_cv_.NotifyOne();
@@ -603,7 +858,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
 
     // There are no inflight ranges, nothing to do.
     if (request_disk_state->in_flight_ranges()->empty()) {
-      request_disk_state->DecrementDiskThread(request_lock, *request_context);
+      request_disk_state->DecrementRequestThread();
       continue;
     }
     DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
@@ -612,7 +867,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
 
     // Now that we've picked a request range, put the context back on the 
queue so
     // another thread can pick up another request range for this context.
-    request_disk_state->ScheduleContext(request_lock, *request_context, 
disk_id);
+    request_disk_state->ScheduleContext(*request_context, disk_id);
     DCHECK((*request_context)->Validate()) << endl << 
(*request_context)->DebugString();
     return true;
   }
@@ -626,62 +881,81 @@ void DiskIoMgr::HandleWriteFinished(
   // Copy disk_id before running callback: the callback may modify write_range.
   int disk_id = write_range->disk_id_;
 
-  // Execute the callback before decrementing the thread count. Otherwise
-  // RequestContext::Cancel() that waits for the disk ref count to be 0 will
-  // return, creating a race, e.g. see IMPALA-1890.
+  // Execute the callback before decrementing the thread count. Otherwise 
CancelContext()
+  // that waits for the disk ref count to be 0 will return, creating a race, 
e.g. see
+  // IMPALA-1890.
   // The status of the write does not affect the status of the writer context.
   write_range->callback_(write_status);
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
     RequestContext::PerDiskState& state = writer->disk_states_[disk_id];
-    state.DecrementDiskThread(writer_lock, writer);
+    if (writer->state_ == RequestContext::Cancelled) {
+      state.DecrementRequestThreadAndCheckDone(writer);
+    } else {
+      state.DecrementRequestThread();
+    }
     --state.num_remaining_ranges();
   }
 }
 
 void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* 
reader,
-    Status read_status, unique_ptr<BufferDescriptor> buffer) {
+    unique_ptr<BufferDescriptor> buffer) {
   unique_lock<mutex> reader_lock(reader->lock_);
 
-  RequestContext::PerDiskState* disk_state = 
&reader->disk_states_[disk_queue->disk_id];
+  RequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
   DCHECK(reader->Validate()) << endl << reader->DebugString();
-  DCHECK_GT(disk_state->num_threads_in_op(), 0);
+  DCHECK_GT(state.num_threads_in_op(), 0);
   DCHECK(buffer->buffer_ != nullptr);
-  DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code 
path.";
 
-  // After calling EnqueueReadyBuffer() below, it is no longer valid to read 
from buffer.
-  // Store the state we need before calling EnqueueReadyBuffer().
-  bool eosr = buffer->eosr_;
+  if (reader->state_ == RequestContext::Cancelled) {
+    state.DecrementRequestThreadAndCheckDone(reader);
+    DCHECK(reader->Validate()) << endl << reader->DebugString();
+    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
+    buffer->buffer_ = nullptr;
+    ScanRange* scan_range = buffer->scan_range_;
+    scan_range->Cancel(reader->status_);
+    // Enqueue the buffer to use the scan range's buffer cleanup path.
+    scan_range->EnqueueBuffer(reader_lock, move(buffer));
+    return;
+  }
+
+  DCHECK_EQ(reader->state_, RequestContext::Active);
+  DCHECK(buffer->buffer_ != nullptr);
 
-  // TODO: IMPALA-4249: it safe to touch 'scan_range' until 
DecrementDiskThread() is
-  // called because all clients of DiskIoMgr keep ScanRange objects alive 
until they
-  // unregister their RequestContext.
+  // Update the reader's scan ranges.  There are a three cases here:
+  //  1. Read error
+  //  2. End of scan range
+  //  3. Middle of scan range
+  if (!buffer->status_.ok()) {
+    // Error case
+    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
+    buffer->buffer_ = nullptr;
+    buffer->eosr_ = true;
+    --state.num_remaining_ranges();
+    buffer->scan_range_->Cancel(buffer->status_);
+  } else if (buffer->eosr_) {
+    --state.num_remaining_ranges();
+  }
+
+  // After calling EnqueueBuffer(), it is no longer valid to read from buffer.
+  // Store the state we need before calling EnqueueBuffer().
+  bool eosr = buffer->eosr_;
   ScanRange* scan_range = buffer->scan_range_;
-  bool scan_range_done = eosr;
-  if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
-    DCHECK_EQ(reader->state_, RequestContext::Active);
-    // Read successfully - update the reader's scan ranges.  There are two 
cases here:
-    //  1. End of scan range or cancelled scan range - don't need to 
reschedule.
-    //  2. Middle of scan range - need to schedule to read next buffer.
-    bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer));
-    if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range);
+  bool is_cached = buffer->is_cached();
+  bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
+  if (eosr) {
+    // For cached buffers, we can't close the range until the cached buffer is 
returned.
+    // Close() is called from DiskIoMgr::ReturnBuffer().
+    if (!is_cached) scan_range->Close();
   } else {
-    // The scan range will be cancelled, either because we hit an error or 
because the
-    // request context was cancelled.  The buffer is not needed - we must free 
it.
-    reader->FreeBuffer(buffer.get());
-    // Propagate 'read_status' to the scan range. If we are here because the 
context
-    // was cancelled, the scan range is already cancelled so we do not need to 
re-cancel
-    // it.
-    if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, 
read_status);
-    scan_range_done = true;
-  }
-  if (scan_range_done) {
-    scan_range->Close();
-    --disk_state->num_remaining_ranges();
+    if (queue_full) {
+      reader->blocked_ranges_.Enqueue(scan_range);
+    } else {
+      reader->ScheduleScanRange(scan_range);
+    }
   }
-  DCHECK(reader->Validate()) << endl << reader->DebugString();
-  disk_state->DecrementDiskThread(reader_lock, reader);
+  state.DecrementRequestThread();
 }
 
 void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
@@ -697,12 +971,14 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   //   3. Perform the read or write as specified.
   // Cancellation checking needs to happen in both steps 1 and 3.
   while (true) {
-    RequestContext* worker_context = nullptr;
+    RequestContext* worker_context = nullptr;;
     RequestRange* range = nullptr;
+
     if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
       DCHECK(shut_down_);
-      return;
+      break;
     }
+
     if (range->request_type() == RequestType::READ) {
       ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
     } else {
@@ -710,8 +986,12 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
       Write(worker_context, static_cast<WriteRange*>(range));
     }
   }
+
+  DCHECK(shut_down_);
 }
 
+// This function reads the specified scan range associated with the
+// specified reader context and disk queue.
 void DiskIoMgr::ReadRange(
     DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) {
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
@@ -719,40 +999,86 @@ void DiskIoMgr::ReadRange(
   unique_ptr<BufferDescriptor> buffer_desc;
   if (range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
     buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, 
reader, range,
-        range->client_buffer_.data, range->client_buffer_.len));
+        range->client_buffer_.data, range->client_buffer_.len, nullptr));
   } else {
-    DCHECK(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER)
-        << "This code path does not handle other buffer types, i.e. HDFS cache"
-        << static_cast<int>(range->external_buffer_tag_);
-    buffer_desc = range->GetNextUnusedBufferForRange();
-    if (buffer_desc == nullptr) {
-      // No buffer available - the range will be rescheduled when a buffer is 
added.
-      unique_lock<mutex> reader_lock(reader->lock_);
-      
reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, 
reader);
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      return;
-    }
+    // Need to allocate a buffer to read into.
+    int64_t buffer_size = ::min(bytes_remaining, 
static_cast<int64_t>(max_buffer_size_));
+    buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, 
buffer_size);
+    if (buffer_desc == nullptr) return;
   }
+  reader->num_used_buffers_.Add(1);
 
   // No locks in this section.  Only working on local vars.  We don't want to 
hold a
   // lock across the read call.
-  Status read_status = range->Open(detail::is_file_handle_caching_enabled());
-  if (read_status.ok()) {
+  buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled());
+  if (buffer_desc->status_.ok()) {
     // Update counters.
-    COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L);
-    COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << 
disk_queue->disk_id);
+    if (reader->active_read_thread_counter_) {
+      reader->active_read_thread_counter_->Add(1L);
+    }
+    if (reader->disks_accessed_bitmap_) {
+      int64_t disk_bit = 1LL << disk_queue->disk_id;
+      reader->disks_accessed_bitmap_->BitOr(disk_bit);
+    }
 
-    read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
+    buffer_desc->status_ = range->Read(buffer_desc->buffer_, 
buffer_desc->buffer_len_,
         &buffer_desc->len_, &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
-    COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_);
+    if (reader->bytes_read_counter_ != nullptr) {
+      COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
+    }
+
     COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
-    COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L);
+    if (reader->active_read_thread_counter_) {
+      reader->active_read_thread_counter_->Add(-1L);
+    }
   }
 
   // Finished read, update reader/disk based on the results
-  HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
+  HandleReadFinished(disk_queue, reader, move(buffer_desc));
+}
+
+unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
+    DiskQueue* disk_queue, RequestContext* reader, ScanRange* range,
+    int64_t buffer_size) {
+  DCHECK(reader->mem_tracker_ != nullptr);
+  bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
+  if (!enough_memory) {
+    // Low memory, GC all the buffers and try again.
+    GcIoBuffers();
+    enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
+  }
+
+  if (!enough_memory) {
+    RequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
+    unique_lock<mutex> reader_lock(reader->lock_);
+
+    // Just grabbed the reader lock, check for cancellation.
+    if (reader->state_ == RequestContext::Cancelled) {
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      state.DecrementRequestThreadAndCheckDone(reader);
+      range->Cancel(reader->status_);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      return nullptr;
+    }
+
+    if (!range->ready_buffers_.empty()) {
+      // We have memory pressure and this range doesn't need another buffer
+      // (it already has one queued). Skip this range and pick it up later.
+      range->blocked_on_queue_ = true;
+      reader->blocked_ranges_.Enqueue(range);
+      state.DecrementRequestThread();
+      return nullptr;
+    } else {
+      // We need to get a buffer anyway since there are none queued. The query
+      // is likely to fail due to mem limits but there's nothing we can do 
about that
+      // now.
+    }
+  }
+  unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, 
buffer_size);
+  DCHECK(buffer_desc != nullptr);
+  return buffer_desc;
 }
 
 void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) 
{
@@ -806,14 +1132,30 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, 
WriteRange* write_range) {
         Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 
description=$3",
         write_range->len_, write_range->file_, errno, GetStrErrMsg())));
   }
-  ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
+  if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) {
+    ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
+  }
+
   return Status::OK();
 }
 
+int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
+  int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
+  int idx = BitUtil::Log2Ceiling64(buffer_size_scaled);
+  DCHECK_GE(idx, 0);
+  DCHECK_LT(idx, free_buffers_.size());
+  return idx;
+}
+
 Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* 
write_range) {
   unique_lock<mutex> writer_lock(writer->lock_);
-  if (writer->state_ == RequestContext::Cancelled) return Status::CANCELLED;
-  writer->AddRangeToDisk(writer_lock, write_range, ScheduleMode::IMMEDIATELY);
+
+  if (writer->state_ == RequestContext::Cancelled) {
+    DCHECK(!writer->status_.ok());
+    return writer->status_;
+  }
+
+  writer->AddRequestRange(write_range, false);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 2b6881b..e5d7eb4 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -30,7 +30,6 @@
 #include "common/hdfs.h"
 #include "common/object-pool.h"
 #include "common/status.h"
-#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/handle-cache.h"
 #include "runtime/io/request-ranges.h"
 #include "runtime/thread-resource-mgr.h"
@@ -43,19 +42,21 @@
 
 namespace impala {
 
+class MemTracker;
+
 namespace io {
 /// Manager object that schedules IO for all queries on all disks and remote 
filesystems
 /// (such as S3). Each query maps to one or more RequestContext objects, each 
of which
 /// has its own queue of scan ranges and/or write ranges.
-///
+//
 /// The API splits up requesting scan/write ranges (non-blocking) and reading 
the data
 /// (blocking). The DiskIoMgr has worker threads that will read from and write 
to
 /// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This 
allows us to
 /// keep all disks and all cores as busy as possible.
-///
+//
 /// All public APIs are thread-safe. It is not valid to call any of the APIs 
after
 /// UnregisterContext() returns.
-///
+//
 /// For Readers:
 /// We can model this problem as a multiple producer (threads for each disk), 
multiple
 /// consumer (scan ranges) problem. There are multiple queues that need to be
@@ -67,102 +68,84 @@ namespace io {
 /// Readers map to scan nodes. The reader then contains a queue of scan 
ranges. The caller
 /// asks the IoMgr for the next range to process. The IoMgr then selects the 
best range
 /// to read based on disk activity and begins reading and queuing buffers for 
that range.
-///
+/// TODO: We should map readers to queries. A reader is the unit of scheduling 
and queries
+/// that have multiple scan nodes shouldn't have more 'turns'.
+//
 /// For Writers:
 /// Data is written via AddWriteRange(). This is non-blocking and adds a 
WriteRange to a
 /// per-disk queue. After the write is complete, a callback in WriteRange is 
invoked.
 /// No memory is allocated within IoMgr for writes and no copies are made. It 
is the
 /// responsibility of the client to ensure that the data to be written is 
valid and that
 /// the file to be written to exists until the callback is invoked.
-///
-/// There are several key methods for scanning data with the IoMgr.
-///  1. StartScanRange(): adds range to the IoMgr to start immediately.
-///  2. AddScanRanges(): adds ranges to the IoMgr that the reader wants to 
scan, but does
-///     not start them until GetNextUnstartedRange() is called.
-///  3. GetNextUnstartedRange(): returns to the caller the next scan range it 
should
-///     process.
-///  4. ScanRange::GetNext(): returns the next buffer for this range, blocking 
until
-///     data is available.
-///
+//
+/// The IoMgr provides three key APIs.
+///  1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges 
that
+///     will eventually need to be read.
+///  2. GetNextRange: returns to the caller the next scan range it should 
process.
+///     This is based on disk load. This also begins reading the data in this 
scan
+///     range. This is blocking.
+///  3. ScanRange::GetNext: returns the next buffer for this range.  This is 
blocking.
+//
 /// The disk threads do not synchronize with each other. The readers and 
writers don't
 /// synchronize with each other. There is a lock and condition variable for 
each request
 /// context queue and each disk queue.
 /// IMPORTANT: whenever both locks are needed, the lock order is to grab the 
context lock
 /// before the disk lock.
-///
+//
 /// Scheduling: If there are multiple request contexts with work for a single 
disk, the
 /// request contexts are scheduled in round-robin order. Multiple disk threads 
can
 /// operate on the same request context. Exactly one request range is 
processed by a
-/// disk thread at a time. If there are multiple scan ranges scheduled for a 
single
-/// context, these are processed in round-robin order.
+/// disk thread at a time. If there are multiple scan ranges scheduled via
+/// GetNextRange() for a single context, these are processed in round-robin 
order.
 /// If there are multiple scan and write ranges for a disk, a read is always 
followed
 /// by a write, and a write is followed by a read, i.e. reads and writes 
alternate.
 /// If multiple write ranges are enqueued for a single disk, they will be 
processed
 /// by the disk threads in order, but may complete in any order. No guarantees 
are made
 /// on ordering of writes across disks.
-///
-/// Resource Management: the IoMgr is designed to share the available disk I/O 
capacity
-/// between many clients and to help use the available I/O capacity 
efficiently. The IoMgr
-/// interfaces are designed to let clients manage their own CPU and memory 
usage while the
-/// IoMgr manages the allocation of the I/O capacity of different I/O devices 
to scan
-/// ranges of different clients.
-///
-/// IoMgr clients may want to work on multiple scan ranges at a time to 
maximize CPU and
-/// I/O utilization. Clients can call GetNextUnstartedRange() to start as many 
concurrent
-/// scan ranges as required, e.g. from each parallel scanner thread. Once a 
scan range has
-/// been returned via GetNextUnstartedRange(), the caller must allocate any 
memory needed
-/// for buffering reads, after which the IoMgr wil start to fill the buffers 
with data
-/// while the caller concurrently consumes and processes the data. For 
example, the logic
-/// in a scanner thread might look like:
+//
+/// Resource Management: effective resource management in the IoMgr is key to 
good
+/// performance. The IoMgr helps coordinate two resources: CPU and disk. For 
CPU,
+/// spinning up too many threads causes thrashing.
+/// Memory usage in the IoMgr comes from queued read buffers.  If we queue the 
minimum
+/// (i.e. 1), then the disks are idle while we are processing the buffer. If 
we don't
+/// limit the queue, then it possible we end up queueing the entire data set 
(i.e. CPU
+/// is slower than disks) and run out of memory.
+/// For both CPU and memory, we want to model the machine as having a fixed 
amount of
+/// resources.  If a single query is running, it should saturate either CPU or 
Disk
+/// as well as using as little memory as possible. With multiple queries, each 
query
+/// should get less CPU. In that case each query will need fewer queued 
buffers and
+/// therefore have less memory usage.
+//
+/// The IoMgr defers CPU management to the caller. The IoMgr provides a 
GetNextRange
+/// API which will return the next scan range the caller should process. The 
caller
+/// can call this from the desired number of reading threads. Once a scan range
+/// has been returned via GetNextRange, the IoMgr will start to buffer reads 
for
+/// that range and it is expected the caller will pull those buffers promptly. 
For
+/// example, if the caller would like to have 1 scanner thread, the read loop
+/// would look like:
 ///   while (more_ranges)
-///     range = GetNextUnstartedRange()
+///     range = GetNextRange()
 ///     while (!range.eosr)
 ///       buffer = range.GetNext()
-///
-/// Note that the IoMgr rather than the client is responsible for choosing 
which scan
-/// range to process next, which allows optimizations like distributing load 
across disks.
-///
-/// Buffer Management:
-/// Buffers for reads are either a) allocated on behalf of the caller with
-/// AllocateBuffersForRange() ("IoMgr-allocated"), b) cached HDFS buffers if 
the scan
-/// range was read from the HDFS cache, or c) a client buffer, large enough to 
fit the
-/// whole scan range's data, that is provided by the caller when constructing 
the
+/// To have multiple reading threads, the caller would simply spin up the 
threads
+/// and each would process the loops above.
+//
+/// To control the number of IO buffers, each scan range has a limit of two 
queued
+/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at 
capacity,
+/// the IoMgr will no longer read for that scan range until the caller has 
processed
+/// a buffer. Assuming the client returns each buffer before requesting the 
next one
+/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O 
buffers per
 /// scan range.
+//
+/// Buffer Management:
+/// Buffers for reads are either a) allocated by the IoMgr and transferred to 
the caller,
+/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided 
by the
+/// caller when constructing the scan range.
 ///
-/// All three kinds of buffers are wrapped in BufferDescriptors before 
returning to the
-/// caller. The caller must always call ReturnBuffer() on the buffer 
descriptor to allow
-/// recycling of the buffer memory and to release any resources associated 
with the buffer
-/// or scan range.
-///
-/// In case a), ReturnBuffer() may re-enqueue the buffer for GetNext() to 
return again if
-/// needed. E.g. if 24MB of buffers were allocated to read a 64MB scan range, 
each buffer
-/// must be returned multiple times. Callers must be careful to call 
ReturnBuffer() with
-/// the previous buffer returned from the range before calling before 
GetNext() so that
-/// at least one buffer is available for the I/O mgr to read data into. 
Calling GetNext()
-/// when the scan range has no buffers to read data into causes a resource 
deadlock.
-/// NB: if the scan range was allocated N buffers, then it's always ok for the 
caller
-/// to hold onto N - 1 buffers, but currently the IoMgr doesn't give the 
caller a way
-/// to determine the value of N.
-///
-/// If the caller wants to maximize I/O throughput, it can give the range 
enough memory
-/// for 3 max-sized buffers per scan range. Having two queued buffers (plus 
the buffer
-/// that is currently being processed by the client) gives good performance in 
most
-/// scenarios:
-/// 1. If the consumer is consuming data faster than we can read from disk, 
then the
-///    queue will be empty most of the time because the buffer will be 
immediately
-///    pulled off the queue as soon as it is added. There will always be an 
I/O request
-///    in the disk queue to maximize I/O throughput, which is the bottleneck 
in this
-///    case.
-/// 2. If we can read from disk faster than the consumer is consuming data, 
the queue
-///    will fill up and there will always be a buffer available for the 
consumer to
-///    read, so the consumer will not block and we maximize consumer 
throughput, which
-///    is the bottleneck in this case.
-/// 3. If the consumer is consuming data at approximately the same rate as we 
are
-///    reading from disk, then the steady state is that the consumer is 
processing one
-///    buffer and one buffer is in the disk queue. The additional buffer can 
absorb
-///    bursts where the producer runs faster than the consumer or the consumer 
runs
-///    faster than the producer without blocking either the producer or 
consumer.
-/// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE.
+/// As a caller reads from a scan range, these buffers are wrapped in 
BufferDescriptors
+/// and returned to the caller. The caller must always call ReturnBuffer() on 
the buffer
+/// descriptor to allow recycling of the associated buffer (if there is an
+/// IoMgr-allocated or HDFS cached buffer).
 ///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In 
that
@@ -178,13 +161,13 @@ namespace io {
 ///   - HDFS will time us out if we hold onto the mlock for too long
 ///   - Holding the lock prevents uncaching this file due to a caching policy 
change.
 /// Therefore, we only issue the cached read when the caller is ready to 
process the
-/// range (GetNextUnstartedRange()) instead of when the ranges are issued. 
This guarantees
-/// that there will be a CPU available to process the buffer and any 
throttling we do with
+/// range (GetNextRange()) instead of when the ranges are issued. This 
guarantees that
+/// there will be a CPU available to process the buffer and any throttling we 
do with
 /// the number of scanner threads properly controls the amount of files we 
mlock.
 /// With cached scan ranges, we cannot close the scan range until the cached 
buffer
 /// is returned (HDFS does not allow this). We therefore need to defer the 
close until
 /// the cached buffer is returned (ReturnBuffer()).
-///
+//
 /// Remote filesystem support (e.g. S3):
 /// Remote filesystems are modeled as "remote disks". That is, there is a 
seperate disk
 /// queue for each supported remote filesystem type. In order to maximize 
throughput,
@@ -193,13 +176,12 @@ namespace io {
 /// intensive than local disk/hdfs because of non-direct I/O and SSL 
processing, and can
 /// be CPU bottlenecked especially if not enough I/O threads for these queues 
are
 /// started.
-///
-/// TODO: We should implement more sophisticated resource management. 
Currently readers
-/// are the unit of scheduling and we attempt to distribute IOPS between them. 
Instead
-/// it would be better to have policies based on queries, resource pools, etc.
+//
 /// TODO: IoMgr should be able to request additional scan ranges from the 
coordinator
 /// to help deal with stragglers.
-///
+/// TODO: look into using a lock free queue
+/// TODO: simplify the common path (less locking, memory allocations).
+//
 /// Structure of the Implementation:
 ///  - All client APIs are defined in this file, request-ranges.h and 
request-context.h.
 ///    Clients can include only the files that they need.
@@ -222,12 +204,10 @@ class DiskIoMgr : public CacheLineAligned {
   ///    disk. This is also the max queue depth.
   ///  - threads_per_solid_state_disk: number of read threads to create per 
solid state
   ///    disk. This is also the max queue depth.
-  ///  - min_buffer_size: minimum io buffer size (in bytes). Will be rounded 
down to the
-  //     nearest power-of-two.
-  ///  - max_buffer_size: maximum io buffer size (in bytes). Will be rounded 
up to the
-  ///    nearest power-of-two. Also the max read size.
+  ///  - min_buffer_size: minimum io buffer size (in bytes)
+  ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read 
size.
   DiskIoMgr(int num_disks, int threads_per_rotational_disk,
-      int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size);
+      int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size);
 
   /// Create DiskIoMgr with default configs.
   DiskIoMgr();
@@ -237,13 +217,16 @@ class DiskIoMgr : public CacheLineAligned {
   ~DiskIoMgr();
 
   /// Initialize the IoMgr. Must be called once before any of the other APIs.
-  Status Init() WARN_UNUSED_RESULT;
-
+  Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
 
   /// Allocates tracking structure for a request context.
   /// Register a new request context and return it to the caller. The caller 
must call
   /// UnregisterContext() for each context.
-  std::unique_ptr<RequestContext> RegisterContext();
+  /// reader_mem_tracker: Is non-null only for readers. IO buffers
+  ///    used for this reader will be tracked by this. If the limit is exceeded
+  ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned 
via
+  ///    GetNext().
+  std::unique_ptr<RequestContext> RegisterContext(MemTracker* 
reader_mem_tracker);
 
   /// Unregisters context from the disk IoMgr by first cancelling it then 
blocking until
   /// all references to the context are removed from I/O manager internal data 
structures.
@@ -253,60 +236,50 @@ class DiskIoMgr : public CacheLineAligned {
   /// up.
   void UnregisterContext(RequestContext* context);
 
-  /// Adds the scan ranges to reader's queues, but does not start scheduling 
it. The range
-  /// can be scheduled by a thread calling GetNextUnstartedRange(). This call 
is
-  /// non-blocking. The caller must not deallocate the scan range pointers 
before
-  /// UnregisterContext(). 'ranges' must not be empty.
-  Status AddScanRanges(
-      RequestContext* reader, const std::vector<ScanRange*>& ranges) 
WARN_UNUSED_RESULT;
-
-  /// Adds the scan range to the queues, as with AddScanRanges(), but 
immediately
-  /// start scheduling the scan range. This can be used to do synchronous 
reads as well
-  /// as schedule dependent ranges, e.g. for columnar formats. This call is 
non-blocking.
-  /// The caller must not deallocate the scan range pointers before 
UnregisterContext().
-  ///
-  /// If this returns true in '*needs_buffers', the caller must then call
-  /// AllocateBuffersForRange() to add buffers for the data to be read into 
before the
-  /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr 
will
-  /// asynchronously read the data for the range and the caller can call
-  /// ScanRange::GetNext() to read the data.
-  Status StartScanRange(
-      RequestContext* reader, ScanRange* range, bool* needs_buffers) 
WARN_UNUSED_RESULT;
+  /// This function cancels the context asychronously. All outstanding requests
+  /// are aborted and tracking structures cleaned up. This does not need to be
+  /// called if the context finishes normally.
+  /// This will also fail any outstanding GetNext()/Read requests.
+  void CancelContext(RequestContext* context);
+
+  /// Adds the scan ranges to the queues. This call is non-blocking. The 
caller must
+  /// not deallocate the scan range pointers before UnregisterContext().
+  /// If schedule_immediately, the ranges are immediately put on the read queue
+  /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
+  /// This can be used to do synchronous reads as well as schedule dependent 
ranges,
+  /// as in the case for columnar formats.
+  Status AddScanRanges(RequestContext* reader,
+      const std::vector<ScanRange*>& ranges,
+      bool schedule_immediately = false) WARN_UNUSED_RESULT;
+  Status AddScanRange(RequestContext* reader, ScanRange* range,
+      bool schedule_immediately = false) WARN_UNUSED_RESULT;
 
   /// Add a WriteRange for the writer. This is non-blocking and schedules the 
context
   /// on the IoMgr disk queue. Does not create any files.
   Status AddWriteRange(
       RequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT;
 
-  /// Tries to get an unstarted scan range that was added to 'reader' with
-  /// AddScanRanges(). On success, returns OK and returns the range in 
'*range'.
-  /// If 'reader' was cancelled, returns CANCELLED. If another error is 
encountered,
-  /// an error status is returned. Otherwise, if error or cancellation wasn't 
encountered
-  /// and there are no unstarted ranges for 'reader', returns OK and sets 
'*range' to
-  /// nullptr.
-  ///
-  /// If '*needs_buffers' is returned as true, the caller must call
-  /// AllocateBuffersForRange() to add buffers for the data to be read into 
before the
-  /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr 
will
-  /// asynchronously read the data for the range and the caller can call
-  /// ScanRange::GetNext() to read the data.
-  Status GetNextUnstartedRange(RequestContext* reader, ScanRange** range,
-      bool* needs_buffers) WARN_UNUSED_RESULT;
-
-  /// Allocates up to 'max_bytes' buffers to read the data from 'range' into 
and schedules
-  /// the range. Called after StartScanRange() or GetNextUnstartedRange() 
returns
-  /// *needs_buffers=true.
-  ///
-  /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >=
-  /// min_read_buffer_size() so that at least one buffer can be allocated. The 
caller
-  /// must ensure that 'bp_client' has at least 'max_bytes' unused 
reservation. Returns ok
-  /// if the buffers were successfully allocated and the range was scheduled.
-  ///
-  /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * 
max_buffer_size()
-  /// will typically maximize I/O throughput. See the "Buffer Management" 
section of
-  /// the class comment for explanation.
-  Status AllocateBuffersForRange(RequestContext* reader,
-      BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t 
max_bytes);
+  /// Returns the next unstarted scan range for this reader. When the range is 
returned,
+  /// the disk threads in the IoMgr will already have started reading from it. 
The
+  /// caller is expected to call ScanRange::GetNext on the returned range.
+  /// If there are no more unstarted ranges, nullptr is returned.
+  /// This call is blocking.
+  Status GetNextRange(RequestContext* reader, ScanRange** range) 
WARN_UNUSED_RESULT;
+
+  /// Reads the range and returns the result in buffer.
+  /// This behaves like the typical synchronous read() api, blocking until the 
data
+  /// is read. This can be called while there are outstanding ScanRanges and is
+  /// thread safe. Multiple threads can be calling Read() per reader at a time.
+  /// range *cannot* have already been added via AddScanRanges.
+  /// This can only be used if the scan range fits in a single IO buffer (i.e. 
is smaller
+  /// than max_read_buffer_size()) or if reading into a client-provided buffer.
+  Status Read(RequestContext* reader, ScanRange* range,
+      std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
+
+  /// Returns the buffer to the IoMgr. This must be called for every buffer
+  /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
+  /// After calling this, the buffer descriptor is invalid and cannot be 
accessed.
+  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
   /// Determine which disk queue this file should be assigned to.  Returns an 
index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
@@ -314,8 +287,32 @@ class DiskIoMgr : public CacheLineAligned {
   /// co-located with the datanode for this file.
   int AssignQueue(const char* file, int disk_id, bool expected_local);
 
-  int64_t min_buffer_size() const { return min_buffer_size_; }
-  int64_t max_buffer_size() const { return max_buffer_size_; }
+  /// TODO: The functions below can be moved to RequestContext.
+  /// Returns the current status of the context.
+  Status context_status(RequestContext* context) const WARN_UNUSED_RESULT;
+
+  void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
+  void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
+  void set_open_file_timer(RequestContext*, RuntimeProfile::Counter*);
+  void set_active_read_thread_counter(RequestContext*, 
RuntimeProfile::Counter*);
+  void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
+
+  int64_t queue_size(RequestContext* reader) const;
+  int64_t bytes_read_local(RequestContext* reader) const;
+  int64_t bytes_read_short_circuit(RequestContext* reader) const;
+  int64_t bytes_read_dn_cache(RequestContext* reader) const;
+  int num_remote_ranges(RequestContext* reader) const;
+  int64_t unexpected_remote_bytes(RequestContext* reader) const;
+  int cached_file_handles_hit_count(RequestContext* reader) const;
+  int cached_file_handles_miss_count(RequestContext* reader) const;
+
+  /// Returns the read throughput across all readers.
+  /// TODO: should this be a sliding window?  This should report metrics for 
the
+  /// last minute, hour and since the beginning.
+  int64_t GetReadThroughput();
+
+  /// Returns the maximum read buffer size
+  int max_read_buffer_size() const { return max_buffer_size_; }
 
   /// Returns the total number of disk queues (both local and remote).
   int num_total_disks() const { return disk_queues_.size(); }
@@ -368,6 +365,29 @@ class DiskIoMgr : public CacheLineAligned {
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, 
int64_t mtime,
       RequestContext* reader, CachedHdfsFileHandle** fid);
 
+  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the 
buffers if
+  /// 'bytes_to_free' is -1.
+  void GcIoBuffers(int64_t bytes_to_free = -1);
+
+  /// The maximum number of ready buffers that can be queued in a scan range. 
Having two
+  /// queued buffers (plus the buffer that is returned to the client) gives 
good
+  /// performance in most scenarios:
+  /// 1. If the consumer is consuming data faster than we can read from disk, 
then the
+  ///    queue will be empty most of the time because the buffer will be 
immediately
+  ///    pulled off the queue as soon as it is added. There will always be an 
I/O request
+  ///    in the disk queue to maximize I/O throughput, which is the bottleneck 
in this
+  ///    case.
+  /// 2. If we can read from disk faster than the consumer is consuming data, 
the queue
+  ///    will fill up and there will always be a buffer available for the 
consumer to
+  ///    read, so the consumer will not block and we maximize consumer 
throughput, which
+  ///    is the bottleneck in this case.
+  /// 3. If the consumer is consuming data at approximately the same rate as 
we are
+  ///    reading from disk, then the steady state is that the consumer is 
processing one
+  ///    buffer and one buffer is in the disk queue. The additional buffer can 
absorb
+  ///    bursts where the producer runs faster than the consumer or the 
consumer runs
+  ///    faster than the producer without blocking either the producer or 
consumer.
+  static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
+
   /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
   /// disk ID (i.e. disk_queue_ index) of num_local_disks().
   enum {
@@ -377,10 +397,6 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_NUM_DISKS
   };
 
-  /// The ideal number of max-sized buffers per scan range to maximise 
throughput.
-  /// See "Buffer Management" in the class comment for explanation.
-  static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;
-
  private:
   friend class BufferDescriptor;
   friend class RequestContext;
@@ -389,9 +405,16 @@ class DiskIoMgr : public CacheLineAligned {
   struct DiskQueue;
 
   friend class DiskIoMgrTest_Buffers_Test;
-  friend class DiskIoMgrTest_BufferSizeSelection_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
 
+  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
+  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
+
+  /// Memory tracker for I/O buffers where the RequestContext has no 
MemTracker.
+  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where 
readers don't
+  /// provide a MemTracker.
+  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
+
   /// Number of worker(read) threads per rotational disk. Also the max depth 
of queued
   /// work to the disk.
   const int num_io_threads_per_rotational_disk_;
@@ -401,10 +424,10 @@ class DiskIoMgr : public CacheLineAligned {
   const int num_io_threads_per_solid_state_disk_;
 
   /// Maximum read size. This is also the maximum size of each allocated 
buffer.
-  const int64_t max_buffer_size_;
+  const int max_buffer_size_;
 
-  /// The minimum size of each read buffer. Must be >= 
BufferPool::min_buffer_len().
-  const int64_t min_buffer_size_;
+  /// The minimum size of each read buffer.
+  const int min_buffer_size_;
 
   /// Thread group containing all the worker threads.
   ThreadGroup disk_thread_group_;
@@ -422,6 +445,28 @@ class DiskIoMgr : public CacheLineAligned {
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter read_timer_;
 
+  /// Protects free_buffers_
+  boost::mutex free_buffers_lock_;
+
+  /// Free buffers that can be handed out to clients. There is one list for 
each buffer
+  /// size, indexed by the Log2 of the buffer size in units of 
min_buffer_size_. The
+  /// maximum buffer size is max_buffer_size_, so the maximum index is
+  /// Log2(max_buffer_size_ / min_buffer_size_).
+  //
+  /// E.g. if min_buffer_size_ = 1024 bytes:
+  ///  free_buffers_[0]  => list of free buffers with size 1024 B
+  ///  free_buffers_[1]  => list of free buffers with size 2048 B
+  ///  free_buffers_[10] => list of free buffers with size 1 MB
+  ///  free_buffers_[13] => list of free buffers with size 8 MB
+  ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
+  std::vector<std::deque<uint8_t*>> free_buffers_;
+
+  /// Total number of allocated buffers, used for debugging.
+  AtomicInt32 num_allocated_buffers_;
+
+  /// Total number of buffers in readers
+  AtomicInt32 num_buffers_in_readers_;
+
   /// Per disk queues. This is static and created once at Init() time.  One 
queue is
   /// allocated for each local disk on the system and for each remote 
filesystem type.
   /// It is indexed by disk id.
@@ -437,6 +482,23 @@ class DiskIoMgr : public CacheLineAligned {
   // handles are closed.
   FileHandleCache file_handle_cache_;
 
+  /// Returns the index into free_buffers_ for a given buffer size
+  int free_buffers_idx(int64_t buffer_size);
+
+  /// Returns a buffer to read into with size between 'buffer_size' and
+  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
+  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
+  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
+  /// The buffer memory is tracked against reader's mem tracker, or
+  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
+  std::unique_ptr<BufferDescriptor> GetFreeBuffer(
+      RequestContext* reader, ScanRange* range, int64_t buffer_size);
+
+  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
nullptr), either
+  /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
+  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.
+  void FreeBufferMemory(BufferDescriptor* desc);
+
   /// Disk worker thread loop. This function retrieves the next range to 
process on
   /// the disk queue and invokes ReadRange() or Write() depending on the type 
of Range().
   /// There can be multiple threads per disk running this loop.
@@ -450,12 +512,10 @@ class DiskIoMgr : public CacheLineAligned {
   bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       RequestContext** request_context);
 
-  /// Updates disk queue and reader state after a read is complete. If the read
-  /// was successful, 'read_status' is ok and 'buffer' contains the result of 
the
-  /// read. If the read failed with an error, 'read_status' contains the error 
and
-  /// 'buffer' has the buffer that was meant to hold the result of the read.
+  /// Updates disk queue and reader state after a read is complete. The read 
result
+  /// is captured in the buffer descriptor.
   void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
-      Status read_status, std::unique_ptr<BufferDescriptor> buffer);
+      std::unique_ptr<BufferDescriptor> buffer);
 
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write 
OK/RUNTIME_ERROR
@@ -478,14 +538,15 @@ class DiskIoMgr : public CacheLineAligned {
   /// Does not open or close the file that is written.
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) 
WARN_UNUSED_RESULT;
 
-  /// Reads the specified scan range and calls HandleReadFinished() when done. 
If no
-  /// buffer is available to read the range's data into, the read cannot 
proceed, the
-  /// range becomes blocked and this function returns without doing I/O.
+  /// Reads the specified scan range and calls HandleReadFinished when done.
   void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* 
range);
 
-  /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a 
scan range
-  /// with length 'scan_range_len', given that 'max_bytes' of memory should be 
allocated.
-  std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t 
max_bytes);
+  /// Try to allocate the next buffer for the scan range, returning the new 
buffer
+  /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
+  /// If there is memory pressure and buffers are already queued, adds the 
range
+  /// to the blocked ranges and returns nullptr.
+  std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* 
disk_queue,
+      RequestContext* reader, ScanRange* range, int64_t buffer_size);
 };
 }
 }

Reply via email to