http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc 
b/be/src/runtime/io/request-context.cc
index 287f53a..dec6aa6 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -17,74 +17,122 @@
 
 #include "runtime/io/disk-io-mgr-internal.h"
 
+#include "runtime/exec-env.h"
+
 #include "common/names.h"
 
 using namespace impala;
 using namespace impala::io;
 
-void RequestContext::Cancel(const Status& status) {
-  DCHECK(!status.ok());
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+}
 
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+    ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+    BufferPool::BufferHandle handle) :
+  io_mgr_(io_mgr),
+  reader_(reader),
+  scan_range_(scan_range),
+  buffer_(handle.data()),
+  buffer_len_(handle.len()),
+  bp_client_(bp_client),
+  handle_(move(handle)) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(bp_client_->is_registered());
+  DCHECK(handle_.is_open());
+}
+
+void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
+  DCHECK(buffer->buffer_ != nullptr);
+  if (!buffer->is_cached() && !buffer->is_client_buffer()) {
+    // Only buffers that were allocated by DiskIoMgr need to be freed.
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        buffer->bp_client_, &buffer->handle_);
+  }
+  buffer->buffer_ = nullptr;
+}
+
+// Cancellation of a RequestContext requires coordination from multiple 
threads that may
+// hold references to the context:
+//  1. Disk threads that are currently processing a range for this context.
+//  2. Caller threads that are waiting in GetNext().
+//
+// Each thread that currently has a reference to the request context must 
notice the
+// cancel, cancel any pending operations involving the context and remove the 
contxt from
+// tracking structures. Once no more operations are pending on the context and 
no more
+// I/O mgr threads hold references to the context, the context can be marked 
inactive
+// (see CancelAndMarkInactive()), after which the owner of the context object 
can free
+// it.
+//
+// The steps are:
+// 1. Cancel() will immediately set the context in the Cancelled state. This 
prevents any
+// other thread from adding more ready buffers to the context (they all take a 
lock and
+// check the state before doing so), or any write ranges to the context.
+// 2. Cancel() will call Cancel() on each ScanRange that is not yet complete, 
unblocking
+// any threads in GetNext(). If there was no prior error for a scan range, any 
reads from
+// that scan range will return a CANCELLED Status. Cancel() also invokes 
callbacks for
+// all WriteRanges with a CANCELLED Status.
+// 3. Disk threads notice the context is cancelled either when picking the 
next context
+// to process or when they try to enqueue a ready buffer. Upon noticing the 
cancelled
+// state, removes the context from the disk queue. The last thread per disk 
then calls
+// DecrementDiskRefCount(). After the last disk thread has called 
DecrementDiskRefCount(),
+// cancellation is done and it is safe to unregister the context.
+void RequestContext::Cancel() {
   // Callbacks are collected in this vector and invoked while no lock is held.
   vector<WriteRange::WriteDoneCallback> write_callbacks;
   {
-    lock_guard<mutex> lock(lock_);
+    unique_lock<mutex> lock(lock_);
     DCHECK(Validate()) << endl << DebugString();
 
     // Already being cancelled
     if (state_ == RequestContext::Cancelled) return;
 
-    DCHECK(status_.ok());
-    status_ = status;
-
     // The reader will be put into a cancelled state until call cleanup is 
complete.
     state_ = RequestContext::Cancelled;
 
-    // Cancel all scan ranges for this reader. Each range could be one one of
-    // four queues.
-    for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
-      RequestRange* range = NULL;
-      while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
-        if (range->request_type() == RequestType::READ) {
-          static_cast<ScanRange*>(range)->Cancel(status);
-        } else {
-          DCHECK(range->request_type() == RequestType::WRITE);
+    // Clear out all request ranges from queues for this reader. Cancel the 
scan
+    // ranges and invoke the write range callbacks to propagate the 
cancellation.
+    for (ScanRange* range : active_scan_ranges_) 
range->CancelInternal(Status::CANCELLED);
+    active_scan_ranges_.clear();
+    for (PerDiskState& disk_state : disk_states_) {
+      RequestRange* range;
+      while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) {
+        if (range->request_type() == RequestType::WRITE) {
           
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
         }
       }
-
-      ScanRange* scan_range;
-      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
-        scan_range->Cancel(status);
-      }
+      while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr);
       WriteRange* write_range;
-      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != 
NULL) {
+      while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != 
nullptr) {
         write_callbacks.push_back(write_range->callback_);
       }
     }
+    // Clear out the lists of scan ranges.
+    while (ready_to_start_ranges_.Dequeue() != nullptr);
+    while (cached_ranges_.Dequeue() != nullptr);
 
-    ScanRange* range = NULL;
-    while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
-    }
-    while ((range = blocked_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
-    }
-    while ((range = cached_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
-    }
-
-    // Schedule reader on all disks. The disks will notice it is cancelled and 
do any
-    // required cleanup
+    // Ensure that the reader is scheduled on all disks (it may already be 
scheduled on
+    // some). The disk threads will notice that the context is cancelled and 
do any
+    // required cleanup for the disk state.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
-      state.ScheduleContext(this, i);
+      disk_states_[i].ScheduleContext(lock, this, i);
     }
   }
 
   for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
-    write_callback(status_);
+    write_callback(Status::CANCELLED);
   }
 
   // Signal reader and unblock the GetNext/Read thread.  That read will fail 
with
@@ -93,7 +141,7 @@ void RequestContext::Cancel(const Status& status) {
 }
 
 void RequestContext::CancelAndMarkInactive() {
-  Cancel(Status::CANCELLED);
+  Cancel();
 
   boost::unique_lock<boost::mutex> l(lock_);
   DCHECK_NE(state_, Inactive);
@@ -102,54 +150,76 @@ void RequestContext::CancelAndMarkInactive() {
   // Wait until the ranges finish up.
   while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
 
-  // Validate that no buffers were leaked from this context.
-  DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
-  DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+  // Validate that no ranges are active.
+  DCHECK_EQ(0, active_scan_ranges_.size()) << endl << DebugString();
+
+  // Validate that no threads are active and the context is not queued.
+  for (const PerDiskState& disk_state : disk_states_) {
+    DCHECK_EQ(0, disk_state.in_flight_ranges()->size()) << endl << 
DebugString();
+    DCHECK_EQ(0, disk_state.unstarted_scan_ranges()->size()) << endl << 
DebugString();
+    DCHECK_EQ(0, disk_state.num_threads_in_op()) << endl << DebugString();
+    DCHECK(!disk_state.is_on_queue()) << endl << DebugString();
+  }
   DCHECK(Validate()) << endl << DebugString();
   state_ = Inactive;
 }
 
-void RequestContext::AddRequestRange(
-    RequestRange* range, bool schedule_immediately) {
-  // DCHECK(lock_.is_locked()); // TODO: boost should have this API
-  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
-  if (state.done()) {
-    DCHECK_EQ(state.num_remaining_ranges(), 0);
-    state.set_done(false);
+void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock,
+    RequestRange* range, ScheduleMode schedule_mode) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  DCHECK_EQ(state_, Active) << DebugString();
+  PerDiskState* disk_state = &disk_states_[range->disk_id()];
+  if (disk_state->done()) {
+    DCHECK_EQ(disk_state->num_remaining_ranges(), 0);
+    disk_state->set_done(false);
     ++num_disks_with_ranges_;
   }
-
-  bool schedule_context;
   if (range->request_type() == RequestType::READ) {
     ScanRange* scan_range = static_cast<ScanRange*>(range);
-    if (schedule_immediately) {
-      ScheduleScanRange(scan_range);
-    } else {
-      state.unstarted_scan_ranges()->Enqueue(scan_range);
+    if (schedule_mode == ScheduleMode::IMMEDIATELY) {
+      ScheduleScanRange(lock, scan_range);
+    } else if (schedule_mode == ScheduleMode::UPON_GETNEXT) {
+      disk_state->unstarted_scan_ranges()->Enqueue(scan_range);
       num_unstarted_scan_ranges_.Add(1);
+      // If there's no 'next_scan_range_to_start', schedule this 
RequestContext so that
+      // one of the 'unstarted_scan_ranges' will become the 
'next_scan_range_to_start'.
+      if (disk_state->next_scan_range_to_start() == nullptr) {
+        disk_state->ScheduleContext(lock, this, range->disk_id());
+      }
     }
-    // If next_scan_range_to_start is NULL, schedule this RequestContext so 
that it will
-    // be set. If it's not NULL, this context will be scheduled when 
GetNextRange() is
-    // invoked.
-    schedule_context = state.next_scan_range_to_start() == NULL;
   } else {
     DCHECK(range->request_type() == RequestType::WRITE);
-    DCHECK(!schedule_immediately);
+    DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << 
static_cast<int>(schedule_mode);
     WriteRange* write_range = static_cast<WriteRange*>(range);
-    state.unstarted_write_ranges()->Enqueue(write_range);
+    disk_state->unstarted_write_ranges()->Enqueue(write_range);
 
-    // ScheduleContext() has no effect if the context is already scheduled,
-    // so this is safe.
-    schedule_context = true;
+    // Ensure that the context is scheduled so that the write range gets 
picked up.
+    // ScheduleContext() has no effect if already scheduled, so this is safe 
to do always.
+    disk_state->ScheduleContext(lock, this, range->disk_id());
   }
+  ++disk_state->num_remaining_ranges();
+}
+
+void RequestContext::AddActiveScanRangeLocked(
+    const unique_lock<mutex>& lock, ScanRange* range) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  DCHECK(state_ == Active);
+  active_scan_ranges_.insert(range);
+}
 
-  if (schedule_context) state.ScheduleContext(this, range->disk_id());
-  ++state.num_remaining_ranges();
+void RequestContext::RemoveActiveScanRange(ScanRange* range) {
+  unique_lock<mutex> lock(lock_);
+  RemoveActiveScanRangeLocked(lock, range);
 }
 
-RequestContext::RequestContext(
-    DiskIoMgr* parent, int num_disks, MemTracker* tracker)
-  : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
+void RequestContext::RemoveActiveScanRangeLocked(
+    const unique_lock<mutex>& lock, ScanRange* range) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  active_scan_ranges_.erase(range);
+}
+
+RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
+  : parent_(parent), disk_states_(num_disks) {}
 
 // Dumps out request context information. Lock should be taken by caller
 string RequestContext::DebugString() const {
@@ -159,13 +229,9 @@ string RequestContext::DebugString() const {
   if (state_ == RequestContext::Cancelled) ss << "Cancelled";
   if (state_ == RequestContext::Active) ss << "Active";
   if (state_ != RequestContext::Inactive) {
-    ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
-       << " #ready_buffers=" << num_ready_buffers_.Load()
-       << " #used_buffers=" << num_used_buffers_.Load()
-       << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
-       << " #finished_scan_ranges=" << num_finished_ranges_.Load()
-       << " #disk_with_ranges=" << num_disks_with_ranges_
-       << " #disks=" << num_disks_with_ranges_;
+    ss << " #disk_with_ranges=" << num_disks_with_ranges_
+       << " #disks=" << num_disks_with_ranges_
+       << " #active scan ranges=" << active_scan_ranges_.size();
     for (int i = 0; i < disk_states_.size(); ++i) {
       ss << endl << "   " << i << ": "
          << "is_on_queue=" << disk_states_[i].is_on_queue()
@@ -188,16 +254,6 @@ bool RequestContext::Validate() const {
     return false;
   }
 
-  if (num_used_buffers_.Load() < 0) {
-    LOG(WARNING) << "num_used_buffers_ < 0: #used=" << 
num_used_buffers_.Load();
-    return false;
-  }
-
-  if (num_ready_buffers_.Load() < 0) {
-    LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << 
num_ready_buffers_.Load();
-    return false;
-  }
-
   int total_unstarted_ranges = 0;
   for (int i = 0; i < disk_states_.size(); ++i) {
     const PerDiskState& state = disk_states_[i];
@@ -275,8 +331,8 @@ bool RequestContext::Validate() const {
       LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
       return false;
     }
-    if (!blocked_ranges_.empty()) {
-      LOG(WARNING) << "Reader cancelled but has blocked ranges.";
+    if (!active_scan_ranges_.empty()) {
+      LOG(WARNING) << "Reader cancelled but has active ranges.";
       return false;
     }
   }
@@ -284,10 +340,11 @@ bool RequestContext::Validate() const {
   return true;
 }
 
-void RequestContext::PerDiskState::ScheduleContext(
+void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& 
context_lock,
     RequestContext* context, int disk_id) {
-  if (!is_on_queue_ && !done_) {
-    is_on_queue_ = true;
+  DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
+  if (is_on_queue_.Load() == 0 && !done_) {
+    is_on_queue_.Store(1);
     context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h 
b/be/src/runtime/io/request-context.h
index fd68669..b028596 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -23,10 +23,13 @@
 
 namespace impala {
 namespace io {
+
+// Mode argument for AddRangeToDisk().
+enum class ScheduleMode {
+  IMMEDIATELY, UPON_GETNEXT, BY_CALLER
+};
 /// A request context is used to group together I/O requests belonging to a 
client of the
-/// I/O manager for management and scheduling. For most I/O manager clients it 
is an
-/// opaque pointer, but some clients may need to include this header, e.g. to 
make the
-/// unique_ptr<DiskIoRequestContext> destructor work correctly.
+/// I/O manager for management and scheduling.
 ///
 /// Implementation Details
 /// ======================
@@ -34,56 +37,109 @@ namespace io {
 /// maintains state across all disks as well as per disk state.
 /// The unit for an IO request is a RequestRange, which may be a ScanRange or a
 /// WriteRange.
-/// A scan range for the reader is on one of five states:
-/// 1) PerDiskState's unstarted_ranges: This range has only been queued
+/// A scan range for the reader is on one of six states:
+/// 1) PerDiskState's 'unstarted_scan_ranges_': This range has only been queued
 ///    and nothing has been read from it.
-/// 2) RequestContext's ready_to_start_ranges_: This range is about to be 
started.
-///    As soon as the reader picks it up, it will move to the in_flight_ranges
+/// 2) RequestContext's 'ready_to_start_ranges_': This range is about to be 
started.
+///    As soon as the reader picks it up, it will move to the 
'in_flight_ranges_'
 ///    queue.
-/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
+/// 3) PerDiskState's 'in_flight_ranges_': This range is being processed and 
will
 ///    be read from the next time a disk thread picks it up in 
GetNextRequestRange()
-/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
-///    anymore. We need the caller to pull a buffer off which will put this in
-///    the in_flight_ranges queue. These ranges are in the RequestContext's
-///    blocked_ranges_ queue.
-/// 5) ScanRange is cached and in the cached_ranges_ queue.
-//
-/// If the scan range is read and does not get blocked on the outgoing queue, 
the
+/// 4) The ScanRange is blocked waiting for buffers because it does not have 
any unused
+///    buffers to read data into. It is unblocked when a client adds new 
buffers via
+///    AllocateBuffersForRange() or returns existing buffers via 
ReturnBuffer().
+///    ScanRanges in this state are identified by 'blocked_on_buffer_' == true.
+/// 5) ScanRange is cached and in the 'cached_ranges_' queue.
+/// 6) Inactive - either all the data for the range was returned or the range 
was
+///    cancelled. I.e. ScanRange::eosr_ is true or ScanRange::cancel_status_ 
!= OK.
+///
+/// If the scan range is read and does not get blocked waiting for buffers, the
 /// transitions are: 1 -> 2 -> 3.
 /// If the scan range does get blocked, the transitions are
 /// 1 -> 2 -> 3 -> (4 -> 3)*
-//
-/// In the case of a cached scan range, the range is immediately put in 
cached_ranges_.
+///
+/// In the case of a cached scan range, the range is immediately put in 
'cached_ranges_'.
 /// When the caller asks for the next range to process, we first pull ranges 
from
-/// the cache_ranges_ queue. If the range was cached, the range is removed and
+/// the 'cache_ranges_' queue. If the range was cached, the range is removed 
and
 /// done (ranges are either entirely cached or not at all). If the cached read 
attempt
 /// fails, we put the range in state 1.
-//
-/// A write range for a context may be in one of two lists:
-/// 1) unstarted_write_ranges_ : Ranges that have been queued but not 
processed.
-/// 2) in_flight_ranges_: The write range is ready to be processed by the next 
disk thread
-///    that picks it up in GetNextRequestRange().
-//
+///
+/// All scan ranges in states 1-5 are tracked in 'active_scan_ranges_' so that 
they can be
+/// cancelled when the RequestContext is cancelled. Scan ranges are removed 
from
+/// 'active_scan_ranges_' during their transition to state 6.
+///
+/// A write range for a context may be in one of two queues:
+/// 1) 'unstarted_write_ranges_': Ranges that have been queued but not 
processed.
+/// 2) 'in_flight_ranges_': The write range is ready to be processed by the 
next disk
+///    thread that picks it up in GetNextRequestRange().
+///
 /// AddWriteRange() adds WriteRanges for a disk.
 /// It is the responsibility of the client to pin the data to be written via a 
WriteRange
 /// in memory. After a WriteRange has been written, a callback is invoked to 
inform the
 /// client that the write has completed.
-//
+///
 /// An important assumption is that write does not exceed the maximum read 
size and that
 /// the entire range is written when the write request is handled. (In other 
words, writes
 /// are not broken up.)
-//
+///
 /// When a RequestContext is processed by a disk thread in 
GetNextRequestRange(),
 /// a write range is always removed from the list of unstarted write ranges 
and appended
 /// to the in_flight_ranges_ queue. This is done to alternate reads and writes 
- a read
-/// that is scheduled (by calling GetNextRange()) is always followed by a 
write (if one
-/// exists).  And since at most one WriteRange can be present in 
in_flight_ranges_ at any
-/// time (once a write range is returned from GetNetxRequestRange() it is 
completed an
-/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can 
be queued up
-/// behind at most one write range.
+/// that is scheduled (by calling GetNextUnstartedRange()) is always followed 
by a write
+/// (if one exists). And since at most one WriteRange can be present in 
in_flight_ranges_
+/// at any time (once a write range is returned from GetNetxRequestRange() it 
is completed
+/// and not re-enqueued), a scan range scheduled via a call to 
GetNextUnstartedRange() can
+/// be queued up behind at most one write range.
 class RequestContext {
  public:
-  ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; }
+  ~RequestContext() {
+    DCHECK_EQ(state_, Inactive) << "Must be unregistered. " << DebugString();
+  }
+
+  /// Cancel the context asynchronously. All outstanding requests are cancelled
+  /// asynchronously. This does not need to be called if the context finishes 
normally.
+  /// Calling GetNext() on any scan ranges belonging to this RequestContext 
will return
+  /// CANCELLED (or another error, if an error was encountered for that scan 
range before
+  /// it is cancelled).
+  void Cancel();
+
+  bool IsCancelled() {
+    boost::unique_lock<boost::mutex> lock(lock_);
+    return state_ == Cancelled;
+  }
+
+  int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
+  int64_t bytes_read_short_circuit() const { return 
bytes_read_short_circuit_.Load(); }
+  int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
+  int num_remote_ranges() const { return num_remote_ranges_.Load(); }
+  int64_t unexpected_remote_bytes() const { return 
unexpected_remote_bytes_.Load(); }
+
+  int cached_file_handles_hit_count() const {
+    return cached_file_handles_hit_count_.Load();
+  }
+
+  int cached_file_handles_miss_count() const {
+    return cached_file_handles_miss_count_.Load();
+  }
+
+  void set_bytes_read_counter(RuntimeProfile::Counter* bytes_read_counter) {
+    bytes_read_counter_ = bytes_read_counter;
+  }
+
+  void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = 
read_timer; }
+
+  void set_open_file_timer(RuntimeProfile::Counter* open_file_timer) {
+    open_file_timer_ = open_file_timer;
+  }
+
+  void set_active_read_thread_counter(
+      RuntimeProfile::Counter* active_read_thread_counter) {
+   active_read_thread_counter_ = active_read_thread_counter;
+  }
+
+  void set_disks_accessed_bitmap(RuntimeProfile::Counter* 
disks_accessed_bitmap) {
+    disks_accessed_bitmap_ = disks_accessed_bitmap;
+  }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(RequestContext);
@@ -106,13 +162,19 @@ class RequestContext {
     Inactive,
   };
 
-  RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
+  RequestContext(DiskIoMgr* parent, int num_disks);
+
+  /// Cleans up a buffer. If the buffer was allocated with 
AllocateBuffersForRange(),
+  /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just 
prepares the
+  /// descriptor to be destroyed. After this is called, buffer->buffer() is 
NULL.
+  /// Does not acquire 'lock_'.
+  void FreeBuffer(BufferDescriptor* buffer);
 
   /// Decrements the number of active disks for this reader.  If the disk count
   /// goes to 0, the disk complete condition variable is signaled.
-  /// Reader lock must be taken before this call.
-  void DecrementDiskRefCount() {
-    // boost doesn't let us dcheck that the reader lock is taken
+  /// 'lock_' must be held via 'lock'.
+  void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock) {
+    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
     DCHECK_GT(num_disks_with_ranges_, 0);
     if (--num_disks_with_ranges_ == 0) {
       disks_complete_cond_var_.NotifyAll();
@@ -129,25 +191,48 @@ class RequestContext {
 
   /// Adds range to in_flight_ranges, scheduling this reader on the disk 
threads
   /// if necessary.
-  /// Reader lock must be taken before this.
-  void ScheduleScanRange(ScanRange* range) {
+  /// 'lock_' must be held via 'lock'. Only valid to call if this context is 
active.
+  void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, 
ScanRange* range) {
+    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
     DCHECK_EQ(state_, Active);
-    DCHECK(range != NULL);
+    DCHECK(range != nullptr);
     RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
     state.in_flight_ranges()->Enqueue(range);
-    state.ScheduleContext(this, range->disk_id());
+    state.ScheduleContext(lock, this, range->disk_id());
   }
 
-  /// Cancels the context with status code 'status'
-  void Cancel(const Status& status);
-
   /// Cancel the context if not already cancelled, wait for all scan ranges to 
finish
   /// and mark the context as inactive, after which it cannot be used.
   void CancelAndMarkInactive();
 
-  /// Adds request range to disk queue for this request context. Currently,
-  /// schedule_immediately must be false is RequestRange is a write range.
-  void AddRequestRange(RequestRange* range, bool schedule_immediately);
+  /// Adds a request range to the appropriate disk state. 'schedule_mode' 
controls which
+  /// queue the range is placed in. This RequestContext is scheduled on the 
disk state
+  /// if required by 'schedule_mode'.
+  ///
+  /// Write ranges must always have 'schedule_mode' IMMEDIATELY and are added 
to the
+  /// 'unstarted_write_ranges_' queue, from which they will be asynchronously 
moved to the
+  /// 'in_flight_ranges_' queue.
+  ///
+  /// Scan ranges can have different 'schedule_mode' values. If IMMEDIATELY, 
the range is
+  /// immediately added to the 'in_flight_ranges_' queue where it will be 
processed
+  /// asynchronously by disk threads. If UPON_GETNEXT, the range is added to 
the
+  /// 'unstarted_ranges_' queue, from which it can be returned to a client by
+  /// DiskIoMgr::GetNextUnstartedRange(). If BY_CALLER, the scan range is not 
added to
+  /// any queues. The range will be scheduled later as a separate step, e.g. 
when it is
+  /// unblocked by adding buffers to it. Caller must hold 'lock_' via 'lock'.
+  void AddRangeToDisk(const boost::unique_lock<boost::mutex>& lock, 
RequestRange* range,
+      ScheduleMode schedule_mode);
+
+  /// Adds an active range to 'active_scan_ranges_'
+  void AddActiveScanRangeLocked(
+      const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
+
+  /// Removes the range from 'active_scan_ranges_'. Called by ScanRange after 
eos or
+  /// cancellation. If calling the Locked version, the caller must hold
+  /// 'lock_'. Otherwise the function will acquire 'lock_'.
+  void RemoveActiveScanRange(ScanRange* range);
+  void RemoveActiveScanRangeLocked(
+      const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
 
   /// Validates invariants of reader.  Reader lock must be taken beforehand.
   bool Validate() const;
@@ -158,9 +243,6 @@ class RequestContext {
   /// Parent object
   DiskIoMgr* const parent_;
 
-  /// Memory used for this reader.  This is unowned by this object.
-  MemTracker* const mem_tracker_;
-
   /// Total bytes read for this reader
   RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
 
@@ -190,13 +272,6 @@ class RequestContext {
   /// Total number of bytes from remote reads that were expected to be local.
   AtomicInt64 unexpected_remote_bytes_{0};
 
-  /// The number of buffers that have been returned to the reader (via 
GetNext) that the
-  /// reader has not returned. Only included for debugging and diagnostics.
-  AtomicInt32 num_buffers_in_reader_{0};
-
-  /// The number of scan ranges that have been completed for this reader.
-  AtomicInt32 num_finished_ranges_{0};
-
   /// The number of scan ranges that required a remote read, updated at the 
end of each
   /// range scan. Only used for diagnostics.
   AtomicInt32 num_remote_ranges_{0};
@@ -211,17 +286,6 @@ class RequestContext {
   /// Total number of file handle opens where the file handle was not in the 
cache
   AtomicInt32 cached_file_handles_miss_count_{0};
 
-  /// The number of buffers that are being used for this reader. This is the 
sum
-  /// of all buffers in ScanRange queues and buffers currently being read into 
(i.e. about
-  /// to be queued). This includes both IOMgr-allocated buffers and 
client-provided
-  /// buffers.
-  AtomicInt32 num_used_buffers_{0};
-
-  /// The total number of ready buffers across all ranges.  Ready buffers are 
buffers
-  /// that have been read from disk but not retrieved by the caller.
-  /// This is the sum of all queued buffers in all ranges for this reader 
context.
-  AtomicInt32 num_ready_buffers_{0};
-
   /// All fields below are accessed by multiple threads and the lock needs to 
be
   /// taken before accessing them. Must be acquired before ScanRange::lock_ if 
both
   /// are held simultaneously.
@@ -230,8 +294,16 @@ class RequestContext {
   /// Current state of the reader
   State state_ = Active;
 
-  /// Status of this reader.  Set to non-ok if cancelled.
-  Status status_;
+  /// Scan ranges that have been added to the IO mgr for this context. Ranges 
can only
+  /// be added when 'state_' is Active. When this context is cancelled, 
Cancel() is
+  /// called for all the active ranges. If a client attempts to add a range 
while
+  /// 'state_' is Cancelled, the range is not added to this list and 
Status::CANCELLED
+  /// is returned to the client. This ensures that all active ranges are 
cancelled as a
+  /// result of RequestContext cancellation.
+  /// Ranges can be cancelled or hit eos non-atomically with their removal 
from this set,
+  /// so eos or cancelled ranges may be temporarily present here. Cancelling 
these ranges
+  /// a second time or cancelling after eos is safe and has no effect.
+  boost::unordered_set<ScanRange*> active_scan_ranges_;
 
   /// The number of disks with scan ranges remaining (always equal to the sum 
of
   /// disks with ranges).
@@ -240,21 +312,18 @@ class RequestContext {
   /// This is the list of ranges that are expected to be cached on the DN.
   /// When the reader asks for a new range (GetNextScanRange()), we first
   /// return ranges from this list.
-  InternalQueue<ScanRange> cached_ranges_;
+  InternalList<ScanRange> cached_ranges_;
 
   /// A list of ranges that should be returned in subsequent calls to
-  /// GetNextRange.
+  /// GetNextUnstartedRange().
   /// There is a trade-off with when to populate this list.  Populating it on
-  /// demand means consumers need to wait (happens in 
DiskIoMgr::GetNextRange()).
+  /// demand means consumers need to wait (happens in 
DiskIoMgr::GetNextUnstartedRange()).
   /// Populating it preemptively means we make worse scheduling decisions.
   /// We currently populate one range per disk.
   /// TODO: think about this some more.
-  InternalQueue<ScanRange> ready_to_start_ranges_;
+  InternalList<ScanRange> ready_to_start_ranges_;
   ConditionVariable ready_to_start_ranges_cv_; // used with lock_
 
-  /// Ranges that are blocked due to back pressure on outgoing buffers.
-  InternalQueue<ScanRange> blocked_ranges_;
-
   /// Condition variable for UnregisterContext() to wait for all disks to 
complete
   ConditionVariable disks_complete_cond_var_;
 
@@ -273,21 +342,9 @@ class RequestContext {
       next_scan_range_to_start_ = range;
     }
 
-    /// We need to have a memory barrier to prevent this load from being 
reordered
-    /// with num_threads_in_op(), since these variables are set without the 
reader
-    /// lock taken
-    bool is_on_queue() const {
-      bool b = is_on_queue_;
-      __sync_synchronize();
-      return b;
-    }
+    bool is_on_queue() const { return is_on_queue_.Load() != 0; }
 
-    int num_threads_in_op() const {
-      int v = num_threads_in_op_.Load();
-      // TODO: determine whether this barrier is necessary for any callsites.
-      AtomicUtil::MemoryBarrier();
-      return v;
-    }
+    int num_threads_in_op() const { return num_threads_in_op_.Load(); }
 
     const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
       return &unstarted_scan_ranges_;
@@ -306,26 +363,41 @@ class RequestContext {
     InternalQueue<RequestRange>* in_flight_ranges() { return 
&in_flight_ranges_; }
 
     /// Schedules the request context on this disk if it's not already on the 
queue.
-    /// Context lock must be taken before this.
-    void ScheduleContext(RequestContext* context, int disk_id);
-
-    /// Increment the ref count on reader.  We need to track the number of 
threads per
-    /// reader per disk that are in the unlocked hdfs read code section. This 
is updated
-    /// by multiple threads without a lock so we need to use an atomic int.
-    void IncrementRequestThreadAndDequeue() {
+    /// context->lock_ must be held by the caller via 'context_lock'.
+    void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
+        RequestContext* context, int disk_id);
+
+    /// Increment the count of disk threads that have a reference to this 
context. These
+    /// threads do not hold any locks while reading from HDFS, so we need to 
prevent the
+    /// RequestContext from being destroyed underneath them.
+    ///
+    /// The caller does not need to hold 'lock_', so this can execute 
concurrently with
+    /// itself and DecrementDiskThread().
+    void IncrementDiskThreadAndDequeue() {
+      /// Incrementing 'num_threads_in_op_' first so that there is no window 
when other
+      /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think 
there are no
+      /// references left to this context.
       num_threads_in_op_.Add(1);
-      is_on_queue_ = false;
+      is_on_queue_.Store(0);
     }
 
-    void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
-
-    /// Decrement request thread count and do final cleanup if this is the last
-    /// thread. RequestContext lock must be taken before this.
-    void DecrementRequestThreadAndCheckDone(RequestContext* context) {
-      num_threads_in_op_.Add(-1); // Also acts as a barrier.
-      if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
-        // This thread is the last one for this reader on this disk, do final 
cleanup
-        context->DecrementDiskRefCount();
+    /// Decrement the count of disks threads with a reference to this context. 
Does final
+    /// cleanup if the context is cancelled and this is the last thread for 
the disk.
+    /// context->lock_ must be held by the caller via 'context_lock'.
+    void DecrementDiskThread(const boost::unique_lock<boost::mutex>& 
context_lock,
+        RequestContext* context) {
+      DCHECK(context_lock.mutex() == &context->lock_ && 
context_lock.owns_lock());
+      num_threads_in_op_.Add(-1);
+
+      if (context->state_ != Cancelled) {
+        DCHECK_EQ(context->state_, Active);
+        return;
+      }
+      // The state is cancelled, check to see if we're the last thread to 
touch the
+      // context on this disk. We need to load 'is_on_queue_' and 
'num_threads_in_op_'
+      // in this order to avoid a race with IncrementDiskThreadAndDequeue().
+      if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && 
!done_) {
+        context->DecrementDiskRefCount(context_lock);
         done_ = true;
       }
     }
@@ -338,7 +410,12 @@ class RequestContext {
     bool done_ = true;
 
     /// For each disk, keeps track if the context is on this disk's queue, 
indicating
-    /// the disk must do some work for this context. The disk needs to do work 
in 4 cases:
+    /// the disk must do some work for this context. 1 means that the context 
is on the
+    /// disk queue, 0 means that it's not on the queue (either because it has 
on ranges
+    /// active for the disk or because a disk thread dequeued the context and 
is
+    /// currently processing a request).
+    ///
+    /// The disk needs to do work in 4 cases:
     ///  1) in_flight_ranges is not empty, the disk needs to read for this 
reader.
     ///  2) next_range_to_start is NULL, the disk needs to prepare a scan 
range to be
     ///     read next.
@@ -349,7 +426,15 @@ class RequestContext {
     /// useful that can be done. If there's nothing useful, the disk queue 
will wake up
     /// and then remove the reader from the queue. Doing this causes thrashing 
of the
     /// threads.
-    bool is_on_queue_ = false;
+    ///
+    /// This variable is important during context cancellation because it 
indicates
+    /// whether a queue has a reference to the context that must be released 
before
+    /// the context is considered unregistered. Atomically set to false after
+    /// incrementing 'num_threads_in_op_' when dequeueing so that there is no 
window
+    /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and 
think there
+    /// are no references left to this context.
+    /// TODO: this could be combined with 'num_threads_in_op_' to be a single 
refcount.
+    AtomicInt32 is_on_queue_{0};
 
     /// For each disks, the number of request ranges that have not been fully 
read.
     /// In the non-cancellation path, this will hit 0, and done will be set to 
true
@@ -363,7 +448,7 @@ class RequestContext {
 
     /// Queue of pending IO requests for this disk in the order that they will 
be
     /// processed. A ScanRange is added to this queue when it is returned in
-    /// GetNextRange(), or when it is added with schedule_immediately = true.
+    /// GetNextUnstartedRange(), or when it is added with schedule_mode == 
IMMEDIATELY.
     /// A WriteRange is added to this queue from unstarted_write_ranges_ for 
each
     /// invocation of GetNextRequestRange() in WorkLoop().
     /// The size of this queue is always less than or equal to 
num_remaining_ranges.
@@ -379,11 +464,11 @@ class RequestContext {
     /// range to ready_to_start_ranges_.
     ScanRange* next_scan_range_to_start_ = nullptr;
 
-    /// For each disk, the number of threads issuing the underlying read/write 
on behalf
-    /// of this context. There are a few places where we release the context 
lock, do some
-    /// work, and then grab the lock again.  Because we don't hold the lock 
for the
-    /// entire operation, we need this ref count to keep track of which thread 
should do
-    /// final resource cleanup during cancellation.
+    /// For each disk, the number of disk threads issuing the underlying 
read/write on
+    /// behalf of this context. There are a few places where we release the 
context lock,
+    /// do some work, and then grab the lock again.  Because we don't hold the 
lock for
+    /// the entire operation, we need this ref count to keep track of which 
thread should
+    /// do final resource cleanup during cancellation.
     /// Only the thread that sees the count at 0 should do the final cleanup.
     AtomicInt32 num_threads_in_op_{0};
 
@@ -392,7 +477,8 @@ class RequestContext {
     /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate 
between reads
     /// and writes. (Otherwise, since next_scan_range_to_start is set
     /// in GetNextRequestRange() whenever it is null, repeated calls to
-    /// GetNextRequestRange() and GetNextRange() may result in only reads 
being processed)
+    /// GetNextRequestRange() and GetNextUnstartedRange() may result in only 
reads being
+    /// processed)
     InternalQueue<WriteRange> unstarted_write_ranges_;
   };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h 
b/be/src/runtime/io/request-ranges.h
index 609f8da..7ec0cb6 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -23,14 +23,15 @@
 
 #include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "common/hdfs.h"
 #include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "util/condition-variable.h"
 #include "util/internal-queue.h"
+#include "util/mem-range.h"
 
 namespace impala {
-class MemTracker;
-
 namespace io {
 class DiskIoMgr;
 class RequestContext;
@@ -55,24 +56,19 @@ class BufferDescriptor {
   /// Returns the offset within the scan range that this buffer starts at
   int64_t scan_range_offset() const { return scan_range_offset_; }
 
-  /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set
-  /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does 
not
-  /// check memory limits on 'dst': the caller should check the memory limit 
if a
-  /// different memory limit may apply to 'dst'. If the buffer was a 
client-provided
-  /// buffer, transferring is not allowed.
-  /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
-  void TransferOwnership(MemTracker* dst);
-
  private:
   friend class DiskIoMgr;
   friend class ScanRange;
   friend class RequestContext;
 
-  /// Create a buffer descriptor for a new reader, range and data buffer. The 
buffer
-  /// memory should already be accounted against 'mem_tracker'.
+  /// Create a buffer descriptor for a new reader, range and data buffer.
+  BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+      ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len);
+
+  /// Create a buffer descriptor allocated from the buffer pool.
   BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
-      ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len,
-      MemTracker* mem_tracker);
+      ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+      BufferPool::BufferHandle handle);
 
   /// Return true if this is a cached buffer owned by HDFS.
   bool is_cached() const;
@@ -86,14 +82,11 @@ class BufferDescriptor {
   /// Reader that this buffer is for.
   RequestContext* const reader_;
 
-  /// The current tracker this buffer is associated with. After initialisation,
-  /// NULL for cached buffers and non-NULL for all other buffers.
-  MemTracker* mem_tracker_;
-
   /// Scan range that this buffer is for. Non-NULL when initialised.
   ScanRange* const scan_range_;
 
-  /// buffer with the read contents
+  /// Buffer for the read contents. Must be set to NULL in 
RequestContext::FreeBuffer()
+  /// before destruction of the descriptor.
   uint8_t* buffer_;
 
   /// length of buffer_. For buffers from cached reads, the length is 0.
@@ -105,10 +98,12 @@ class BufferDescriptor {
   /// true if the current scan range is complete
   bool eosr_ = false;
 
-  /// Status of the read to this buffer. if status is not ok, 'buffer' is 
nullptr
-  Status status_;
-
   int64_t scan_range_offset_ = 0;
+
+  // Handle to an allocated buffer and the client used to allocate it buffer. 
Only used
+  // for non-external buffers.
+  BufferPool::ClientHandle* bp_client_ = nullptr;
+  BufferPool::BufferHandle handle_;
 };
 
 /// The request type, read or write associated with a request range.
@@ -216,11 +211,11 @@ class ScanRange : public RequestRange {
 
   /// Resets this scan range object with the scan range description. The scan 
range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr 
for the
-  /// local filesystem). The scan range must fall within the file bounds 
(offset >= 0
-  /// and offset + len <= file_length). 'disk_id' is the disk queue to add the 
range
-  /// to. If 'expected_local' is true, a warning is generated if the read did 
not
-  /// come from a local disk. 'buffer_opts' specifies buffer management 
options -
-  /// see the DiskIoMgr class comment and the BufferOpts comments for details.
+  /// local filesystem). The scan range must be non-empty and fall within the 
file bounds
+  /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is 
the disk
+  /// queue to add the range to. If 'expected_local' is true, a warning is 
generated if
+  /// the read did not come from a local disk. 'buffer_opts' specifies buffer 
management
+  /// options - see the DiskIoMgr class comment and the BufferOpts comments 
for details.
   /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary 
data.
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int 
disk_id,
       bool expected_local, const BufferOpts& buffer_opts, void* meta_data = 
nullptr);
@@ -236,10 +231,17 @@ class ScanRange : public RequestRange {
   /// Only one thread can be in GetNext() at any time.
   Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
 
-  /// Cancel this scan range. This cleans up all queued buffers and
-  /// wakes up any threads blocked on GetNext().
-  /// Status is the reason the range was cancelled. Must not be ok().
-  /// Status is returned to the user in GetNext().
+  /// Returns the buffer to the scan range. This must be called for every 
buffer
+  /// returned by GetNext(). After calling this, the buffer descriptor is 
invalid
+  /// and cannot be accessed.
+  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Cancel this scan range. This cleans up all queued buffers and wakes up 
any threads
+  /// blocked on GetNext(). Status is a non-ok status with the reason the 
range was
+  /// cancelled, e.g. CANCELLED if the range was cancelled because it was not 
needed, or
+  /// another error if an error was encountered while scanning the range. 
Status is
+  /// returned to the any callers of GetNext(). If a thread is currently 
blocked in
+  /// GetNext(), it is woken up.
   void Cancel(const Status& status);
 
   /// return a descriptive string for debug.
@@ -255,18 +257,13 @@ class ScanRange : public RequestRange {
   /// Initialize internal fields
   void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
 
-  /// Enqueues a buffer for this range. This does not block.
-  /// Returns true if this scan range has hit the queue capacity, false 
otherwise.
+  /// Enqueues a ready buffer with valid data for this range. This does not 
block.
   /// The caller passes ownership of buffer to the scan range and it is not
   /// valid to access buffer after this call. The reader lock must be held by 
the
-  /// caller.
-  bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+  /// caller. Returns false if the scan range was cancelled.
+  bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
       std::unique_ptr<BufferDescriptor> buffer);
 
-  /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
-  /// be called with any locks taken.
-  void CleanupQueuedBuffers();
-
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this.
   bool Validate();
@@ -283,6 +280,10 @@ class ScanRange : public RequestRange {
   /// exclusive use by this scan range. The scan range is the exclusive owner 
of the
   /// file handle, and the file handle is destroyed in Close().
   /// All local OS files are opened using normal OS file APIs.
+  ///
+  /// If an error is encountered during opening, returns a status describing 
the error.
+  /// If the scan range was cancelled, returns the reason for cancellation. 
Otherwise, on
+  /// success, returns OK.
   Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
 
   /// Closes the file for this range. This function only modifies state in 
this range.
@@ -290,6 +291,10 @@ class ScanRange : public RequestRange {
 
   /// Reads from this range into 'buffer', which has length 'buffer_len' 
bytes. Returns
   /// the number of bytes read. The read position in this scan range is 
updated.
+  ///
+  /// If an error is encountered during reading, returns a status describing 
the error.
+  /// If the scan range was cancelled, returns the reason for cancellation. 
Otherwise, on
+  /// success, returns OK.
   Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read,
       bool* eosr) WARN_UNUSED_RESULT;
 
@@ -307,6 +312,58 @@ class ScanRange : public RequestRange {
   Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
       bool* read_succeeded) WARN_UNUSED_RESULT;
 
+  /// Add buffers for the range to read data into and schedule the range if 
blocked.
+  /// If 'returned' is true, the buffers returned from GetNext() that are 
being recycled
+  /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to 
be added.
+  void AddUnusedBuffers(
+      std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned);
+
+  /// Remove a buffer from 'unused_iomgr_buffers_' and update
+  /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, 
return NULL.
+  /// 'lock_' must be held by the caller via 'scan_range_lock'.
+  std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
+      const boost::unique_lock<boost::mutex>& scan_range_lock);
+
+  /// Get the next buffer for this scan range for a disk thread to read into. 
Returns
+  /// the new buffer if successful.  If no buffers are available, marks the 
range
+  /// as blocked and returns nullptr. Called must not hold 'lock_'.
+  std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange();
+
+  /// Cleans up a buffer that was not returned to the client.
+  /// Either ReturnBuffer() or CleanUpBuffer() is called for every 
BufferDescriptor.
+  /// The caller must hold 'lock_' via 'scan_range_lock'.
+  /// This function may acquire 'hdfs_lock_'
+  void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock,
+      std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Same as CleanUpBuffer() except cleans up multiple buffers and caller 
must not
+  /// hold 'lock_'.
+  void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& 
buffers);
+
+  /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when 
the scan
+  /// range is cancelled or at eos. The caller must hold 'lock_' via 
'scan_range_lock'.
+  void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& 
scan_range_lock);
+
+  /// Same as Cancel() except reader_->lock must be held by the caller.
+  void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
+      const Status& status);
+
+  /// Same as Cancel() except doesn't remove the scan range from
+  /// reader_->active_scan_ranges_. This is invoked by 
RequestContext::Cancel(),
+  /// which removes the range itself to avoid invalidating its 
active_scan_ranges_
+  /// iterator.
+  void CancelInternal(const Status& status);
+
+  /// Marks the scan range as blocked waiting for a buffer. Caller must not 
hold 'lock_'.
+  void SetBlockedOnBuffer();
+
+  /// Returns true if no more buffers will be returned to clients in the 
future,
+  /// either because of hitting eosr or cancellation.
+  bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) 
const {
+    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+    return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
+  }
+
   /// Pointer to caller specified metadata. This is untouched by the io manager
   /// and the caller can put whatever auxiliary data in here.
   void* meta_data_ = nullptr;
@@ -323,6 +380,9 @@ class ScanRange : public RequestRange {
   /// TODO: we can do more with this
   bool expected_local_ = false;
 
+  /// Last modified time of the file associated with the scan range. Set in 
Reset().
+  int64_t mtime_;
+
   /// Total number of bytes read remotely. This is necessary to maintain a 
count of
   /// the number of remote scan ranges. Since IO statistics can be collected 
multiple
   /// times for a scan range, it is necessary to keep some state about whether 
this
@@ -369,6 +429,10 @@ class ScanRange : public RequestRange {
     struct hadoopRzBuffer* cached_buffer_ = nullptr;
   };
 
+  /// The number of buffers that have been returned to a client via GetNext() 
that have
+  /// not yet been returned with ReturnBuffer().
+  AtomicInt32 num_buffers_in_reader_{0};
+
   /// Lock protecting fields below.
   /// This lock should not be taken during Open()/Read()/Close().
   /// If RequestContext::lock_ and this lock need to be held simultaneously,
@@ -378,25 +442,40 @@ class ScanRange : public RequestRange {
   /// Number of bytes read so far for this scan range
   int bytes_read_;
 
-  /// Status for this range. This is non-ok if is_cancelled_ is true.
-  /// Note: an individual range can fail without the RequestContext being
-  /// cancelled. This allows us to skip individual ranges.
-  Status status_;
+  /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. 
These are
+  /// initially populated when the client calls AllocateBuffersForRange() and
+  /// and are used to read scanned data into. Buffers are taken from this 
vector for
+  /// every read and added back, if needed, when the client calls 
ReturnBuffer().
+  std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_;
+
+  /// Total number of bytes of buffers in 'unused_iomgr_buffers_'.
+  int64_t unused_iomgr_buffer_bytes_ = 0;
+
+  /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). 
Used to
+  /// infer how many bytes of buffers need to be held onto to read the rest of 
the scan
+  /// range.
+  int64_t iomgr_buffer_bytes_returned_ = 0;
 
   /// If true, the last buffer for this scan range has been queued.
+  /// If this is true and 'ready_buffers_' is empty, then no more buffers will 
be
+  /// returned to the caller by this scan range.
   bool eosr_queued_ = false;
 
-  /// If true, the last buffer for this scan range has been returned.
-  bool eosr_returned_ = false;
+  /// If true, this scan range is not scheduled because a buffer is not 
available for
+  /// the next I/O in the range. This can happen when the scan range is 
initially created
+  /// or if the buffers added to the range have all been filled with data an 
not yet
+  /// returned.
+  bool blocked_on_buffer_ = false;
 
-  /// If true, this scan range has been removed from the reader's 
in_flight_ranges
-  /// queue because the ready_buffers_ queue is full.
-  bool blocked_on_queue_ = false;
+  /// IO buffers that are queued for this scan range. When Cancel() is called
+  /// this is drained by the cancelling thread. I.e. this is always empty if
+  /// 'cancel_status_' is not OK.
+  std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
-  /// IO buffers that are queued for this scan range.
-  /// Condition variable for GetNext
+  /// Condition variable for threads in GetNext() that are waiting for the 
next buffer.
+  /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan 
range is
+  /// cancelled.
   ConditionVariable buffer_ready_cv_;
-  std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
   /// Lock that should be taken during hdfs calls. Only one thread (the disk 
reading
   /// thread) calls into hdfs at a time so this lock does not have performance 
impact.
@@ -406,11 +485,16 @@ class ScanRange : public RequestRange {
   /// If this lock and lock_ need to be taken, lock_ must be taken first.
   boost::mutex hdfs_lock_;
 
-  /// If true, this scan range has been cancelled.
-  bool is_cancelled_ = false;
-
-  /// Last modified time of the file associated with the scan range
-  int64_t mtime_;
+  /// If non-OK, this scan range has been cancelled. This status is the reason 
for
+  /// cancellation - CANCELLED if cancelled without error, or another status 
if an
+  /// error caused cancellation. Note that a range can be cancelled without 
cancelling
+  /// the owning context. This means that ranges can be cancelled or hit 
errors without
+  /// aborting all scan ranges.
+  //
+  /// Writers must hold both 'lock_' and 'hdfs_lock_'. Readers must hold 
either 'lock_'
+  /// or 'hdfs_lock_'. This prevents the range from being cancelled while any 
thread
+  /// is inside a critical section.
+  Status cancel_status_;
 };
 
 /// Used to specify data to be written to a file and offset.

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index bd89846..4f7c38b 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -42,35 +42,27 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum 
read chunk size to u
 // any time and only one thread will remove from the queue. This is to 
guarantee
 // that buffers are queued and read in file order.
 
-bool ScanRange::EnqueueBuffer(
+bool ScanRange::EnqueueReadyBuffer(
     const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> 
buffer) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+  DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
   {
     unique_lock<mutex> scan_range_lock(lock_);
     DCHECK(Validate()) << DebugString();
-    DCHECK(!eosr_returned_);
     DCHECK(!eosr_queued_);
-    if (is_cancelled_) {
-      // Return the buffer, this range has been cancelled
-      if (buffer->buffer_ != nullptr) {
-        io_mgr_->num_buffers_in_readers_.Add(1);
-        reader_->num_buffers_in_reader_.Add(1);
-      }
-      reader_->num_used_buffers_.Add(-1);
-      io_mgr_->ReturnBuffer(move(buffer));
+    if (!cancel_status_.ok()) {
+      // This range has been cancelled, no need to enqueue the buffer.
+      CleanUpBuffer(scan_range_lock, move(buffer));
       return false;
     }
-    reader_->num_ready_buffers_.Add(1);
+    // Clean up any surplus buffers. E.g. we may have allocated too many if 
the file was
+    // shorter than expected.
+    if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock);
     eosr_queued_ = buffer->eosr();
     ready_buffers_.emplace_back(move(buffer));
-
-    DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
-    blocked_on_queue_ = ready_buffers_.size() == 
DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
   }
-
   buffer_ready_cv_.NotifyOne();
-
-  return blocked_on_queue_;
+  return true;
 }
 
 Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
@@ -78,123 +70,225 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* 
buffer) {
   bool eosr;
   {
     unique_lock<mutex> scan_range_lock(lock_);
-    if (eosr_returned_) return Status::OK();
     DCHECK(Validate()) << DebugString();
+    // No more buffers to return - return the cancel status or OK if not 
cancelled.
+    if (all_buffers_returned(scan_range_lock)) return cancel_status_;
 
-    while (ready_buffers_.empty() && !is_cancelled_) {
+    while (ready_buffers_.empty() && cancel_status_.ok()) {
       buffer_ready_cv_.Wait(scan_range_lock);
     }
-
-    if (is_cancelled_) {
-      DCHECK(!status_.ok());
-      return status_;
-    }
+    /// Propagate cancellation to the client if it happened while we were 
waiting.
+    RETURN_IF_ERROR(cancel_status_);
 
     // Remove the first ready buffer from the queue and return it
     DCHECK(!ready_buffers_.empty());
-    DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
     *buffer = move(ready_buffers_.front());
     ready_buffers_.pop_front();
-    eosr_returned_ = (*buffer)->eosr();
     eosr = (*buffer)->eosr();
+    DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString();
   }
 
-  // Update tracking counters. The buffer has now moved from the IoMgr to the
-  // caller.
-  io_mgr_->num_buffers_in_readers_.Add(1);
-  reader_->num_buffers_in_reader_.Add(1);
-  reader_->num_ready_buffers_.Add(-1);
-  reader_->num_used_buffers_.Add(-1);
-  if (eosr) reader_->num_finished_ranges_.Add(1);
-
-  Status status = (*buffer)->status_;
-  if (!status.ok()) {
-    io_mgr_->ReturnBuffer(move(*buffer));
-    return status;
-  }
-
-  unique_lock<mutex> reader_lock(reader_->lock_);
+  // Update tracking counters. The buffer has now moved from the IoMgr to the 
caller.
+  if (eosr) reader_->RemoveActiveScanRange(this);
+  num_buffers_in_reader_.Add(1);
+  return Status::OK();
+}
 
-  DCHECK(reader_->Validate()) << endl << reader_->DebugString();
-  if (reader_->state_ == RequestContext::Cancelled) {
-    reader_->blocked_ranges_.Remove(this);
-    Cancel(reader_->status_);
-    io_mgr_->ReturnBuffer(move(*buffer));
-    return status_;
-  }
+void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+  vector<unique_ptr<BufferDescriptor>> buffers;
+  buffers.emplace_back(move(buffer_desc));
+  AddUnusedBuffers(move(buffers), true);
+}
 
+void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& 
buffers,
+      bool returned) {
+  DCHECK_GT(buffers.size(), 0);
+  /// Keep track of whether the range was unblocked in this function. If so, 
we need
+  /// to schedule it so it resumes progress.
+  bool unblocked = false;
   {
-    // Check to see if we can re-schedule a blocked range. Note that 
EnqueueBuffer()
-    // may have been called after we released 'lock_' above so we need to 
re-check
-    // whether the queue is full.
     unique_lock<mutex> scan_range_lock(lock_);
-    if (blocked_on_queue_
-        && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT
-        && !eosr_queued_) {
-      blocked_on_queue_ = false;
-      // This scan range was blocked and is no longer, add it to the reader
-      // queue again.
-      reader_->blocked_ranges_.Remove(this);
-      reader_->ScheduleScanRange(this);
+    if (returned) {
+      // Buffers were in reader but now aren't.
+      num_buffers_in_reader_.Add(-buffers.size());
+    }
+
+    for (unique_ptr<BufferDescriptor>& buffer : buffers) {
+      // We should not hold onto the buffers in the following cases:
+      // 1. the scan range is using external buffers, e.g. cached buffers.
+      // 2. the scan range is cancelled
+      // 3. the scan range already hit eosr
+      // 4. we already have enough buffers to read the remainder of the scan 
range.
+      if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
+          || !cancel_status_.ok()
+          || eosr_queued_
+          || unused_iomgr_buffer_bytes_ >= len_ - 
iomgr_buffer_bytes_returned_) {
+        CleanUpBuffer(scan_range_lock, move(buffer));
+      } else {
+        unused_iomgr_buffer_bytes_ += buffer->buffer_len();
+        unused_iomgr_buffers_.emplace_back(move(buffer));
+        if (blocked_on_buffer_) {
+          blocked_on_buffer_ = false;
+          unblocked = true;
+        }
+      }
     }
   }
-  return Status::OK();
+  // Must drop the ScanRange lock before acquiring the RequestContext lock.
+  if (unblocked) {
+    unique_lock<mutex> reader_lock(reader_->lock_);
+    // Reader may have been cancelled after we dropped 'scan_range_lock' above.
+    if (reader_->state_ == RequestContext::Cancelled) {
+      DCHECK(!cancel_status_.ok());
+    } else {
+      reader_->ScheduleScanRange(reader_lock, this);
+    }
+  }
+}
+
+unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
+    const unique_lock<mutex>& scan_range_lock) {
+  DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
+  if (unused_iomgr_buffers_.empty()) return nullptr;
+  unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back());
+  unused_iomgr_buffers_.pop_back();
+  unused_iomgr_buffer_bytes_ -= result->buffer_len();
+  return result;
+}
+
+unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() {
+  unique_lock<mutex> lock(lock_);
+  unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock);
+  if (buffer_desc == nullptr) {
+    blocked_on_buffer_ = true;
+  } else {
+    iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len();
+  }
+  return buffer_desc;
+}
+
+void ScanRange::SetBlockedOnBuffer() {
+  unique_lock<mutex> lock(lock_);
+  blocked_on_buffer_ = true;
+}
+
+void ScanRange::CleanUpBuffer(
+    const boost::unique_lock<boost::mutex>& scan_range_lock,
+    unique_ptr<BufferDescriptor> buffer_desc) {
+  DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
+  DCHECK(buffer_desc != nullptr);
+  DCHECK_EQ(this, buffer_desc->scan_range_);
+  buffer_desc->reader_->FreeBuffer(buffer_desc.get());
+
+  if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() 
== 0) {
+    // Close the scan range if there are no more buffers in the reader and no 
more buffers
+    // will be returned to readers in future. Close() is idempotent so it is 
ok to call
+    // multiple times during cleanup so long as the range is actually finished.
+    Close();
+  }
+}
+
+void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) 
{
+  unique_lock<mutex> lock(lock_);
+  for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, 
move(buffer));
+}
+
+void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& 
scan_range_lock) {
+  while (!unused_iomgr_buffers_.empty()) {
+    CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock));
+  }
 }
 
 void ScanRange::Cancel(const Status& status) {
   // Cancelling a range that was never started, ignore.
   if (io_mgr_ == nullptr) return;
+  CancelInternal(status);
+  reader_->RemoveActiveScanRange(this);
+}
 
+void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& 
reader_lock,
+    const Status& status) {
+  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+  CancelInternal(status);
+  reader_->RemoveActiveScanRangeLocked(reader_lock, this);
+}
+
+void ScanRange::CancelInternal(const Status& status) {
+  DCHECK(io_mgr_ != nullptr);
   DCHECK(!status.ok());
   {
-    // Grab both locks to make sure that all working threads see is_cancelled_.
+    // Grab both locks to make sure that we don't change 'cancel_status_' 
while other
+    // threads are in critical sections.
     unique_lock<mutex> scan_range_lock(lock_);
-    unique_lock<mutex> hdfs_lock(hdfs_lock_);
-    DCHECK(Validate()) << DebugString();
-    if (is_cancelled_) return;
-    is_cancelled_ = true;
-    status_ = status;
+    {
+      unique_lock<mutex> hdfs_lock(hdfs_lock_);
+      DCHECK(Validate()) << DebugString();
+      // If already cancelled, preserve the original reason for cancellation. 
The first
+      // thread to set 'cancel_status_' does the cleanup below.
+      RETURN_VOID_IF_ERROR(cancel_status_);
+      cancel_status_ = status;
+    }
+
+    /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other 
threads
+    /// from seeing inconsistent state.
+    while (!ready_buffers_.empty()) {
+      CleanUpBuffer(scan_range_lock, move(ready_buffers_.front()));
+      ready_buffers_.pop_front();
+    }
+
+    /// Clean up buffers that we don't need any more because we won't read any 
more data.
+    CleanUpUnusedBuffers(scan_range_lock);
   }
   buffer_ready_cv_.NotifyAll();
-  CleanupQueuedBuffers();
 
   // For cached buffers, we can't close the range until the cached buffer is 
returned.
-  // Close() is called from DiskIoMgr::ReturnBuffer().
+  // Close() is called from ScanRange::CleanUpBufferLocked().
   if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
 }
 
-void ScanRange::CleanupQueuedBuffers() {
-  DCHECK(is_cancelled_);
-  io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
-  reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
-  reader_->num_used_buffers_.Add(-ready_buffers_.size());
-  reader_->num_ready_buffers_.Add(-ready_buffers_.size());
-
-  while (!ready_buffers_.empty()) {
-    io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
-    ready_buffers_.pop_front();
-  }
-}
-
 string ScanRange::DebugString() const {
   stringstream ss;
   ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
      << " len=" << len_ << " bytes_read=" << bytes_read_
+     << " cancel_status=" << cancel_status_.GetDetail()
      << " buffer_queue=" << ready_buffers_.size()
+     << " num_buffers_in_readers=" << num_buffers_in_reader_.Load()
+     << " unused_iomgr_buffers=" << unused_iomgr_buffers_.size()
+     << " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_
+     << " blocked_on_buffer=" << blocked_on_buffer_
      << " hdfs_file=" << exclusive_hdfs_fh_;
   return ss.str();
 }
 
 bool ScanRange::Validate() {
   if (bytes_read_ > len_) {
-    LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the 
scan range."
+    LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan 
range."
                  << " bytes_read_=" << bytes_read_ << " len_=" << len_;
     return false;
   }
-  if (eosr_returned_ && !eosr_queued_) {
-    LOG(WARNING) << "Returned eosr to reader before finishing reading the scan 
range"
-                 << " eosr_returned_=" << eosr_returned_
-                 << " eosr_queued_=" << eosr_queued_;
+  if (!cancel_status_.ok() && !ready_buffers_.empty()) {
+    LOG(ERROR) << "Cancelled range should not have queued buffers " << 
DebugString();
+    return false;
+  }
+  int64_t unused_iomgr_buffer_bytes = 0;
+  for (auto& buffer : unused_iomgr_buffers_)
+    unused_iomgr_buffer_bytes += buffer->buffer_len();
+  if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) {
+    LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: "
+               << unused_iomgr_buffer_bytes_
+               << " vs. expected: " << unused_iomgr_buffer_bytes;
+    return false;
+  }
+  bool is_finished = !cancel_status_.ok() || eosr_queued_;
+  if (is_finished && !unused_iomgr_buffers_.empty()) {
+    LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size()
+               << " bytes: " << unused_iomgr_buffer_bytes_
+               << " cancel_status: " << cancel_status_.GetDetail()
+               << " eosr_queued: " << eosr_queued_;
+    return false;
+  }
+  if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) {
+    LOG(ERROR) << "Blocked despite having buffers: " << DebugString();
     return false;
   }
   return true;
@@ -203,13 +297,14 @@ bool ScanRange::Validate() {
 ScanRange::ScanRange()
   : RequestRange(RequestType::READ),
     num_remote_bytes_(0),
-    external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
-    mtime_(-1) {}
+    external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
 
 ScanRange::~ScanRange() {
   DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
   DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
       << "Cached buffer was not released.";
+  DCHECK_EQ(0, ready_buffers_.size());
+  DCHECK_EQ(0, num_buffers_in_reader_.Load());
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
@@ -245,24 +340,22 @@ void ScanRange::Reset(hdfsFS fs, const char* file, 
int64_t len, int64_t offset,
 void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   DCHECK(exclusive_hdfs_fh_ == nullptr);
   DCHECK(local_file_ == nullptr);
-  // Reader must provide MemTracker or a buffer.
-  DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
-      || reader->mem_tracker_ != nullptr);
   io_mgr_ = io_mgr;
   reader_ = reader;
   local_file_ = nullptr;
   exclusive_hdfs_fh_ = nullptr;
   bytes_read_ = 0;
-  is_cancelled_ = false;
-  eosr_queued_= false;
-  eosr_returned_= false;
-  blocked_on_queue_ = false;
+  unused_iomgr_buffer_bytes_ = 0;
+  iomgr_buffer_bytes_returned_ = 0;
+  cancel_status_ = Status::OK();
+  eosr_queued_ = false;
+  blocked_on_buffer_ = false;
   DCHECK(Validate()) << DebugString();
 }
 
 Status ScanRange::Open(bool use_file_handle_cache) {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (is_cancelled_) return Status::CANCELLED;
+  RETURN_IF_ERROR(cancel_status_);
 
   if (fs_ != nullptr) {
     if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
@@ -302,9 +395,7 @@ Status ScanRange::Open(bool use_file_handle_cache) {
           "for file: $1: $2", offset_, file_, GetStrErrMsg()));
     }
   }
-  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
-  }
+  ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
   return Status::OK();
 }
 
@@ -356,9 +447,7 @@ void ScanRange::Close() {
     local_file_ = nullptr;
     closed_file = true;
   }
-  if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
-  }
+  if (closed_file) ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
 }
 
 int64_t ScanRange::MaxReadChunkSize() const {
@@ -386,7 +475,7 @@ int64_t ScanRange::MaxReadChunkSize() const {
 Status ScanRange::Read(
     uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (is_cancelled_) return Status::CANCELLED;
+  RETURN_IF_ERROR(cancel_status_);
 
   *eosr = false;
   *bytes_read = 0;
@@ -533,7 +622,7 @@ Status ScanRange::ReadFromCache(
 
   {
     unique_lock<mutex> hdfs_lock(hdfs_lock_);
-    if (is_cancelled_) return Status::CANCELLED;
+    RETURN_IF_ERROR(cancel_status_);
 
     DCHECK(exclusive_hdfs_fh_ != nullptr);
     DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
@@ -568,20 +657,16 @@ Status ScanRange::ReadFromCache(
   }
 
   // Create a single buffer desc for the entire scan range and enqueue that.
-  // 'mem_tracker' is nullptr because the memory is owned by the HDFS java 
client,
-  // not the Impala backend.
+  // The memory is owned by the HDFS java client, not the Impala backend.
   unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new 
BufferDescriptor(
-      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
+      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0));
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;
   bytes_read_ = bytes_read;
-  EnqueueBuffer(reader_lock, move(desc));
-  if (reader_->bytes_read_counter_ != nullptr) {
-    COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
-  }
+  EnqueueReadyBuffer(reader_lock, move(desc));
+  COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
   *read_succeeded = true;
-  reader_->num_used_buffers_.Add(1);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 10a3424..8d28f8f 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -107,7 +107,6 @@ class MemTracker {
   /// destruction to prevent other threads from getting a reference to the 
MemTracker
   /// via its parent. Only used to deregister the query-level MemTracker from 
the
   /// global hierarchy.
-  /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be 
deleted.
   void CloseAndUnregisterFromParent();
 
   /// Include counters from a ReservationTracker in logs and other diagnostics.

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 770eaba..0d6c6f0 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -49,7 +49,7 @@ Status TestEnv::Init() {
   exec_env_.reset(new ExecEnv);
   // Populate the ExecEnv state that the backend tests need.
   exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process"));
-  
RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker()));
+  RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
   exec_env_->metrics_.reset(new MetricGroup("test-env-metrics"));
   exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
   if (have_tmp_file_mgr_args_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 318fdbe..6f82658 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -25,6 +25,7 @@
 #include <gtest/gtest.h>
 
 #include "common/init.h"
+#include "runtime/io/request-context.h"
 #include "runtime/test-env.h"
 #include "runtime/tmp-file-mgr-internal.h"
 #include "runtime/tmp-file-mgr.h"
@@ -135,7 +136,7 @@ class TmpFileMgrTest : public ::testing::Test {
 
   /// Helper to cancel the FileGroup RequestContext.
   static void CancelIoContext(TmpFileMgr::FileGroup* group) {
-    group->io_mgr_->CancelContext(group->io_ctx_.get());
+    group->io_ctx_->Cancel();
   }
 
   /// Helper to get the # of bytes allocated by the group. Validates that the 
sum across

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index b44a5c4..04e15d4 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -241,7 +241,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, 
DiskIoMgr* io_mgr,
     next_allocation_index_(0),
     free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
-  io_ctx_ = io_mgr_->RegisterContext(nullptr);
+  io_ctx_ = io_mgr_->RegisterContext();
 }
 
 TmpFileMgr::FileGroup::~FileGroup() {
@@ -379,7 +379,10 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* 
handle, MemRange buffer) {
       BufferOpts::ReadInto(buffer.data(), buffer.len()));
   read_counter_->Add(1);
   bytes_read_counter_->Add(buffer.len());
-  RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, 
true));
+  bool needs_buffers;
+  RETURN_IF_ERROR(io_mgr_->StartScanRange(
+      io_ctx_.get(), handle->read_range_, &needs_buffers));
+  DCHECK(!needs_buffers) << "Already provided a buffer";
   return Status::OK();
 }
 
@@ -409,7 +412,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* 
handle, MemRange buf
   }
 exit:
   // Always return the buffer before exiting to avoid leaking it.
-  if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer));
+  if (io_mgr_buffer != nullptr) 
handle->read_range_->ReturnBuffer(move(io_mgr_buffer));
   handle->read_range_ = nullptr;
   return status;
 }
@@ -505,11 +508,20 @@ TmpFileMgr::WriteHandle::WriteHandle(
     is_cancelled_(false),
     write_in_flight_(false) {}
 
+TmpFileMgr::WriteHandle::~WriteHandle() {
+  DCHECK(!write_in_flight_);
+  DCHECK(read_range_ == nullptr);
+}
+
 string TmpFileMgr::WriteHandle::TmpFilePath() const {
   if (file_ == nullptr) return "";
   return file_->path();
 }
 
+int64_t TmpFileMgr::WriteHandle::len() const {
+  return write_range_->len();
+}
+
 Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* 
io_ctx,
     File* file, int64_t offset, MemRange buffer,
     WriteRange::WriteDoneCallback callback) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index b6fce43..f7dd9ed 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,7 +28,6 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h" // for TUniqueId
-#include "runtime/io/request-ranges.h"
 #include "util/collection-metrics.h"
 #include "util/condition-variable.h"
 #include "util/mem-range.h"
@@ -37,6 +36,12 @@
 #include "util/spinlock.h"
 
 namespace impala {
+namespace io {
+  class DiskIoMgr;
+  class RequestContext;
+  class ScanRange;
+  class WriteRange;
+}
 
 /// TmpFileMgr provides an abstraction for management of temporary (a.k.a. 
scratch) files
 /// on the filesystem and I/O to and from them. TmpFileMgr manages multiple 
scratch
@@ -84,6 +89,7 @@ class TmpFileMgr {
   /// Needs to be public for TmpFileMgrTest.
   typedef int DeviceId;
 
+  /// Same typedef as io::WriteRange::WriteDoneCallback.
   typedef std::function<void(const Status&)> WriteDoneCallback;
 
   /// Represents a group of temporary files - one per disk with a scratch 
directory. The
@@ -277,10 +283,7 @@ class TmpFileMgr {
    public:
     /// The write must be destroyed by passing it to FileGroup - destroying it 
before
     /// the write completes is an error.
-    ~WriteHandle() {
-      DCHECK(!write_in_flight_);
-      DCHECK(read_range_ == nullptr);
-    }
+    ~WriteHandle();
 
     /// Cancel any in-flight read synchronously.
     void CancelRead();
@@ -290,7 +293,7 @@ class TmpFileMgr {
     std::string TmpFilePath() const;
 
     /// The length of the write range in bytes.
-    int64_t len() const { return write_range_->len(); }
+    int64_t len() const;
 
     std::string DebugString();
 
@@ -305,7 +308,7 @@ class TmpFileMgr {
     /// failure and 'is_cancelled_' is set to true on failure.
     Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
         int64_t offset, MemRange buffer,
-        io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
+        WriteDoneCallback callback) WARN_UNUSED_RESULT;
 
     /// Retry the write after the initial write failed with an error, instead 
writing to
     /// 'offset' of 'file'. 'write_in_flight_' must be true before calling.

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 5f8d443..6f70727 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -258,6 +258,17 @@ TEST(BitUtil, Log2) {
   EXPECT_EQ(BitUtil::Log2CeilingNonZero64(ULLONG_MAX), 64);
 }
 
+TEST(BitUtil, RoundToPowerOfTwo) {
+  EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(9));
+  EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(15));
+  EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(16));
+  EXPECT_EQ(32, BitUtil::RoundUpToPowerOfTwo(17));
+  EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(9));
+  EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(15));
+  EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(16));
+  EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(17));
+}
+
 TEST(BitUtil, RoundUpToPowerOf2) {
   EXPECT_EQ(BitUtil::RoundUpToPowerOf2(7, 8), 8);
   EXPECT_EQ(BitUtil::RoundUpToPowerOf2(8, 8), 8);

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 5c9a29b..8a65509 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -98,6 +98,12 @@ class BitUtil {
     return v;
   }
 
+  /// Returns the largest power of two <= v.
+  static inline int64_t RoundDownToPowerOfTwo(int64_t v) {
+    int64_t v_rounded_up = RoundUpToPowerOfTwo(v);
+    return v_rounded_up == v ? v : v_rounded_up / 2;
+  }
+
   /// Returns 'value' rounded up to the nearest multiple of 'factor' when 
factor is
   /// a power of two
   static inline int64_t RoundUpToPowerOf2(int64_t value, int64_t factor) {
@@ -105,7 +111,7 @@ class BitUtil {
     return (value + (factor - 1)) & ~(factor - 1);
   }
 
-  static inline int RoundDownToPowerOf2(int value, int factor) {
+  static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) {
     DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
     return value & ~(factor - 1);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index c2a2644..815e4af 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -46,12 +46,6 @@ const char* ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES =
     "impala-server.hash-table.total-bytes";
 const char* ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES =
     "impala-server.io-mgr.num-open-files";
-const char* ImpaladMetricKeys::IO_MGR_NUM_BUFFERS =
-    "impala-server.io-mgr.num-buffers";
-const char* ImpaladMetricKeys::IO_MGR_TOTAL_BYTES =
-    "impala-server.io-mgr.total-bytes";
-const char* ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS =
-    "impala-server.io-mgr.num-unused-buffers";
 const char* ImpaladMetricKeys::IO_MGR_BYTES_READ =
     "impala-server.io-mgr.bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ =
@@ -211,11 +205,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
 
   // Initialize IO mgr metrics
-  IO_MGR_NUM_OPEN_FILES = 
m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
-  IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
-  IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
-  IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge(
-      ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0);
+  IO_MGR_NUM_OPEN_FILES = m->AddGauge(
+      ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
   IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
   IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/util/impalad-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index b49caf9..7de7aa8 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -67,15 +67,6 @@ class ImpaladMetricKeys {
   /// Number of files currently opened by the io mgr
   static const char* IO_MGR_NUM_OPEN_FILES;
 
-  /// Number of IO buffers allocated by the io mgr
-  static const char* IO_MGR_NUM_BUFFERS;
-
-  /// Number of bytes used by IO buffers (used and unused).
-  static const char* IO_MGR_TOTAL_BYTES;
-
-  /// Number of IO buffers that are currently unused (and can be GC'ed)
-  static const char* IO_MGR_NUM_UNUSED_BUFFERS;
-
   /// Total number of bytes read by the io mgr
   static const char* IO_MGR_BYTES_READ;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 01698ce..22198aa 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -240,6 +240,9 @@ struct THdfsScanNode {
   // The byte offset of the slot for Parquet metadata if Parquet count star 
optimization
   // is enabled.
   10: optional i32 parquet_count_star_slot_offset
+
+  // The ideal memory reservation in bytes to process an input split.
+  11: optional i64 ideal_scan_range_reservation
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 4f0a0e1..aae3863 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -163,6 +163,25 @@ public class SlotDescriptor {
   }
 
   /**
+   * Checks if this descriptor describes  an array "pos" pseudo-column.
+   *
+   * Note: checking whether the column is null distinguishes between top-level 
columns
+   * and nested types. This check more specifically looks just for a reference 
to the
+   * "pos" field of an array type.
+   */
+  public boolean isArrayPosRef() {
+    if (parent_ == null) return false;
+    Type parentType = parent_.getType();
+    if (parentType instanceof CollectionStructType) {
+      if (((CollectionStructType)parentType).isArrayStruct() &&
+          label_.equals(Path.ARRAY_POS_FIELD_NAME)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Assembles the absolute materialized path to this slot starting from the 
schema
    * root. The materialized path points to the first non-struct schema element 
along the
    * path starting from the parent's tuple path to this slot's path.

Reply via email to